MySql入门:备份恢复与安全管理
MySQL备份恢复与安全管理
数据是企业的核心资产,确保数据安全性和可恢复性是DBA最重要的职责。今天,我们将深入探讨MySQL的备份恢复策略和安全管理制度,帮助你构建既安全又可靠的数据库环境。
1. 备份策略与实施
逻辑备份:mysqldump实用技巧
基础备份命令:
# 完整数据库备份mysqldump -u root -p --all-databases --single-transaction --master-data=2 --flush-logs > full_backup_$(date +%Y%m%d).sql# 单个数据库备份mysqldump -u root -p --databases company --single-transaction --routines --triggers --events > company_backup_$(date +%Y%m%d).sql# 单个表备份mysqldump -u root -p company employees departments --single-transaction --where="salary>5000" > high_salary_employees.sql# 压缩备份mysqldump -u root -p --all-databases --single-transaction | gzip > full_backup_$(date +%Y%m%d).sql.gz高级备份选项:
# 生产环境完整备份脚本mysqldump -u backup_user -p'secure_password' \ --all-databases \ --single-transaction \ --master-data=2 \ --flush-logs \ --routines \ --triggers \ --events \ --hex-blob \ --complete-insert \ --extended-insert \ --max-allowed-packet=1G \ --set-gtid-purged=ON \ --result-file=/backup/full_backup_$(date +%Y%m%d_%H%M%S).sql# 分库备份脚本for DB in $(mysql -u root -p'password' -e "SHOW DATABASES;" | grep -Ev "(Database|information_schema|performance_schema|sys)")do mysqldump -u root -p'password' --databases $DB --single-transaction --routines --triggers > /backup/${DB}_backup_$(date +%Y%m%d).sqldone备份验证脚本:
#!/bin/bash# backup_verify.shBACKUP_FILE=$1LOG_FILE="/var/log/mysql/backup_verify.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}verify_backup() { local file=$1 log "开始验证备份文件: $file" # 检查文件是否存在 if [ ! -f "$file" ]; then log "错误: 备份文件不存在 - $file" return 1 fi # 检查文件大小 local file_size=$(stat -f%z "$file" 2>/dev/null || stat -c%s "$file" 2>/dev/null) if [ "$file_size" -lt 1024 ]; then log "错误: 备份文件过小 - $file" return 1 fi # 验证SQL文件完整性 if [[ "$file" == *.sql ]]; then # 检查SQL文件头 if ! head -n 10 "$file" | grep -q "MySQL dump"; then log "错误: 无效的SQL备份文件 - $file" return 1 fi # 检查SQL文件尾 if ! tail -n 5 "$file" | grep -q "Dump completed"; then log "警告: 备份文件可能不完整 - $file" fi fi # 验证压缩文件 if [[ "$file" == *.gz ]]; then if ! gzip -t "$file" 2>/dev/null; then log "错误: 压缩文件损坏 - $file" return 1 fi fi log "备份文件验证通过: $file" return 0}# 执行验证verify_backup "$BACKUP_FILE"exit $?物理备份:XtraBackup实战
完整备份与恢复:
# 安装XtraBackup# Ubuntu/Debianwget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.debsudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.debsudo apt-get updatesudo apt-get install percona-xtrabackup-80# 完整备份xtrabackup --backup --user=backup_user --password='secure_password' --target-dir=/backup/full_$(date +%Y%m%d_%H%M%S)# 准备备份(应用日志)xtrabackup --prepare --target-dir=/backup/full_20231201_120000# 恢复备份systemctl stop mysqlmv /var/lib/mysql /var/lib/mysql_oldxtrabackup --copy-back --target-dir=/backup/full_20231201_120000chown -R mysql:mysql /var/lib/mysqlsystemctl start mysql增量备份策略:
#!/bin/bash# incremental_backup.shBASE_DIR="/backup"FULL_BACKUP_DIR="$BASE_DIR/full_$(date +%Y%m%d)"INCREMENTAL_DIR="$BASE_DIR/inc_$(date +%Y%m%d_%H%M%S)"BACKUP_USER="backup_user"BACKUP_PASSWORD="secure_password"LOG_FILE="/var/log/mysql/xtrabackup.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}# 检查基础备份是否存在find_base_backup() { find $BASE_DIR -name "full_*" -type d | sort -r | head -1}perform_full_backup() { log "开始完整备份" xtrabackup --backup --user=$BACKUP_USER --password=$BACKUP_PASSWORD --target-dir=$FULL_BACKUP_DIR if [ $? -eq 0 ]; then log "完整备份完成: $FULL_BACKUP_DIR" echo $FULL_BACKUP_DIR > $BASE_DIR/latest_full_backup else log "完整备份失败" exit 1 fi}perform_incremental_backup() { local base_dir=$1 log "开始增量备份,基于: $base_dir" xtrabackup --backup --user=$BACKUP_USER --password=$BACKUP_PASSWORD \ --target-dir=$INCREMENTAL_DIR \ --incremental-basedir=$base_dir if [ $? -eq 0 ]; then log "增量备份完成: $INCREMENTAL_DIR" else log "增量备份失败" exit 1 fi}# 主逻辑BASE_BACKUP=$(find_base_backup)if [ -z "$BASE_BACKUP" ] || [ $(find $BASE_BACKUP -name "xtrabackup_checkpoints" -mtime +7 | wc -l) -gt 0 ]; then # 没有基础备份或基础备份超过7天,执行完整备份 perform_full_backupelse # 执行增量备份 perform_incremental_backup $BASE_BACKUPfi备份恢复演练:
#!/bin/bash# disaster_recovery_drill.shRECOVERY_DIR="/recovery"BACKUP_SOURCE="/backup"MYSQL_DATA_DIR="/var/lib/mysql"LOG_FILE="/var/log/mysql/recovery_drill.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}prepare_recovery_environment() { log "准备恢复环境" # 停止MySQL服务 systemctl stop mysql # 备份当前数据 mv $MYSQL_DATA_DIR ${MYSQL_DATA_DIR}_backup_$(date +%Y%m%d_%H%M%S) # 创建恢复目录 mkdir -p $RECOVERY_DIR}restore_from_backup() { local backup_dir=$1 log "从备份恢复: $backup_dir" # 准备备份 xtrabackup --prepare --apply-log-only --target-dir=$backup_dir # 恢复备份 xtrabackup --copy-back --target-dir=$backup_dir # 设置权限 chown -R mysql:mysql $MYSQL_DATA_DIR}verify_recovery() { log "验证恢复结果" # 启动MySQL systemctl start mysql # 等待服务启动 sleep 30 # 基础验证 if mysql -u root -p'password' -e "SELECT 1;" > /dev/null 2>&1; then log "MySQL服务启动成功" # 验证关键表 local table_count=$(mysql -u root -p'password' -N -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema NOT IN ('mysql','information_schema','performance_schema','sys');") log "发现 $table_count 个用户表" # 验证数据完整性 mysql -u root -p'password' -e "CHECK TABLE company.employees EXTENDED;" >> $LOG_FILE return 0 else log "MySQL服务启动失败" return 1 fi}# 执行恢复演练prepare_recovery_environment# 查找最新的完整备份LATEST_FULL_BACKUP=$(find $BACKUP_SOURCE -name "full_*" -type d | sort -r | head -1)if [ -n "$LATEST_FULL_BACKUP" ]; then restore_from_backup $LATEST_FULL_BACKUP verify_recoveryelse log "错误: 未找到完整备份" exit 1fi增量备份与差异备份
二进制日志备份:
-- 启用二进制日志-- 在my.cnf中配置/*[mysqld]log_bin = /var/lib/mysql/mysql-binexpire_logs_days = 7max_binlog_size = 100M*/-- 查看二进制日志状态SHOW BINARY LOGS;/*+------------------+-----------+| Log_name | File_size |+------------------+-----------+| mysql-bin.000001 | 194 || mysql-bin.000002 | 456 || mysql-bin.000003 | 123 |+------------------+-----------+*/-- 刷新日志(创建新的二进制日志文件)FLUSH BINARY LOGS;-- 查看当前正在使用的二进制日志SHOW MASTER STATUS;自动化二进制日志备份:
#!/bin/bash# binlog_backup.shMYSQL_USER="backup_user"MYSQL_PASSWORD="secure_password"BACKUP_DIR="/backup/binlog"LOG_FILE="/var/log/mysql/binlog_backup.log"RETENTION_DAYS=7log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}backup_binlog() { log "开始二进制日志备份" # 获取当前二进制日志文件 CURRENT_BINLOG=$(mysql -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SHOW MASTER STATUS" | awk '{print $1}') # 备份所有未备份的二进制日志 for BINLOG in $(mysql -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SHOW BINARY LOGS" | awk '{print $1}' | grep -v "$CURRENT_BINLOG"); do if [ ! -f "$BACKUP_DIR/$BINLOG" ]; then log "备份二进制日志: $BINLOG" cp /var/lib/mysql/$BINLOG $BACKUP_DIR/ # 验证备份 if cmp /var/lib/mysql/$BINLOG $BACKUP_DIR/$BINLOG; then log "备份验证成功: $BINLOG" else log "备份验证失败: $BINLOG" fi fi done}purge_old_backups() { log "清理过期备份(保留 $RETENTION_DAYS 天)" find $BACKUP_DIR -name "mysql-bin.*" -mtime +$RETENTION_DAYS -delete}# 创建备份目录mkdir -p $BACKUP_DIR# 执行备份backup_binlogpurge_old_backupslog "二进制日志备份完成"备份压缩与加密
加密备份方案:
#!/bin/bash# encrypted_backup.shBACKUP_DIR="/backup/encrypted"MYSQL_USER="backup_user"MYSQL_PASSWORD="secure_password"ENCRYPTION_KEY="/etc/mysql/backup.key"LOG_FILE="/var/log/mysql/encrypted_backup.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}generate_encryption_key() { if [ ! -f "$ENCRYPTION_KEY" ]; then log "生成加密密钥" openssl rand -base64 32 > $ENCRYPTION_KEY chmod 600 $ENCRYPTION_KEY fi}create_encrypted_backup() { local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).xb.enc" log "创建加密备份: $backup_file" # 使用XtraBackup创建备份并立即加密 xtrabackup --backup --user=$MYSQL_USER --password=$MYSQL_PASSWORD --stream=xbstream | \ openssl enc -aes-256-cbc -salt -pass file:$ENCRYPTION_KEY -out $backup_file if [ $? -eq 0 ]; then log "加密备份创建成功: $backup_file" else log "加密备份创建失败" exit 1 fi}verify_encrypted_backup() { local backup_file=$1 log "验证加密备份: $backup_file" # 尝试解密备份头信息 if openssl enc -aes-256-cbc -d -pass file:$ENCRYPTION_KEY -in $backup_file | head -c 100 | strings | grep -q "MySQL"; then log "加密备份验证成功" return 0 else log "加密备份验证失败" return 1 fi}# 主逻辑generate_encryption_keymkdir -p $BACKUP_DIRcreate_encrypted_backup# 验证最新的备份LATEST_BACKUP=$(ls -t $BACKUP_DIR/*.enc | head -1)if [ -n "$LATEST_BACKUP" ]; then verify_encrypted_backup $LATEST_BACKUPfi压缩备份优化:
#!/bin/bash# compressed_backup.shBACKUP_DIR="/backup/compressed"MYSQL_USER="backup_user"MYSQL_PASSWORD="secure_password"COMPRESSION_LEVEL=6 # 1-9,数字越大压缩率越高但速度越慢LOG_FILE="/var/log/mysql/compressed_backup.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}create_compressed_backup() { local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz" log "创建压缩备份 (级别: $COMPRESSION_LEVEL)" # 使用mysqldump和gzip创建压缩备份 mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction --routines --triggers --events | \ gzip -$COMPRESSION_LEVEL > $backup_file local backup_size=$(du -h $backup_file | cut -f1) log "压缩备份完成: $backup_file (大小: $backup_size)"}create_parallel_compressed_backup() { local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz" log "创建并行压缩备份" # 使用pigz进行并行压缩(如果可用) if command -v pigz >/dev/null 2>&1; then mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction | \ pigz -p 4 -$COMPRESSION_LEVEL > $backup_file else mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction | \ gzip -$COMPRESSION_LEVEL > $backup_file fi local backup_size=$(du -h $backup_file | cut -f1) log "并行压缩备份完成: $backup_file (大小: $backup_size)"}# 创建备份目录mkdir -p $BACKUP_DIR# 根据系统资源选择备份方式if [ $(nproc) -gt 2 ]; then create_parallel_compressed_backupelse create_compressed_backupfi云环境备份方案
AWS S3备份方案:
#!/bin/bash# s3_backup.shBACKUP_DIR="/backup/s3_upload"S3_BUCKET="my-company-mysql-backups"S3_PATH="mysql/$(hostname)"MYSQL_USER="backup_user"MYSQL_PASSWORD="secure_password"RETENTION_DAYS=30LOG_FILE="/var/log/mysql/s3_backup.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}create_backup() { local backup_file="$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql.gz" log "创建备份文件: $backup_file" mysqldump -u $MYSQL_USER -p$MYSQL_PASSWORD --all-databases --single-transaction --routines --triggers --events | \ gzip > $backup_file echo $backup_file}upload_to_s3() { local backup_file=$1 local s3_key="$S3_PATH/$(basename $backup_file)" log "上传到S3: s3://$S3_BUCKET/$s3_key" if aws s3 cp $backup_file s3://$S3_BUCKET/$s3_key; then log "S3上传成功" return 0 else log "S3上传失败" return 1 fi}cleanup_old_backups() { log "清理本地过期备份" find $BACKUP_DIR -name "*.sql.gz" -mtime +7 -delete log "清理S3过期备份" aws s3 ls s3://$S3_BUCKET/$S3_PATH/ | while read line; do create_date=$(echo $line | awk '{print $1" "$2}') create_date_epoch=$(date -d "$create_date" +%s) retention_epoch=$(date -d "$RETENTION_DAYS days ago" +%s) if [ $create_date_epoch -lt $retention_epoch ]; then file_name=$(echo $line | awk '{print $4}') aws s3 rm s3://$S3_BUCKET/$S3_PATH/$file_name log "删除过期S3备份: $file_name" fi done}verify_s3_backup() { local backup_file=$1 local s3_key="$S3_PATH/$(basename $backup_file)" log "验证S3备份完整性" # 下载备份文件 local temp_file="/tmp/verify_$(basename $backup_file)" aws s3 cp s3://$S3_BUCKET/$s3_key $temp_file # 比较本地和S3的文件 if cmp $backup_file $temp_file; then log "S3备份验证成功" rm $temp_file return 0 else log "S3备份验证失败" rm $temp_file return 1 fi}# 主逻辑mkdir -p $BACKUP_DIRBACKUP_FILE=$(create_backup)if [ -n "$BACKUP_FILE" ]; then if upload_to_s3 $BACKUP_FILE; then verify_s3_backup $BACKUP_FILE fificleanup_old_backups2. 数据恢复与灾难恢复
基于时间点的恢复(PITR)
PITR恢复流程:
#!/bin/bash# pitr_recovery.shRESTORE_TIME="2023-12-01 14:30:00"BACKUP_DIR="/backup"BINLOG_DIR="/var/lib/mysql"RECOVERY_DIR="/recovery"MYSQL_DATA_DIR="/var/lib/mysql"LOG_FILE="/var/log/mysql/pitr_recovery.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}find_relevant_backup() { log "查找适用于时间点 $RESTORE_TIME 的备份" # 查找在恢复时间之前的最新完整备份 for BACKUP in $(ls -t $BACKUP_DIR/full_* 2>/dev/null); do local backup_time=$(stat -c %y $BACKUP/xtrabackup_info | cut -d' ' -f1,2 | cut -d'.' -f1) local backup_epoch=$(date -d "$backup_time" +%s) local restore_epoch=$(date -d "$RESTORE_TIME" +%s) if [ $backup_epoch -le $restore_epoch ]; then echo $BACKUP return 0 fi done log "错误: 未找到合适的完整备份" exit 1}extract_binlog_events() { local start_time=$1 local stop_time=$2 local output_file=$3 log "提取二进制日志事件: $start_time 到 $stop_time" # 查找包含时间范围的二进制日志文件 for BINLOG in $(ls -tr $BINLOG_DIR/mysql-bin.* 2>/dev/null | grep -v '.index'); do local first_event_time=$(mysqlbinlog $BINLOG | grep -m1 "end_log_pos" | awk '{print $1, $2}' | tr -d '#') local last_event_time=$(mysqlbinlog $BINLOG | tail -10 | grep "end_log_pos" | tail -1 | awk '{print $1, $2}' | tr -d '#') if [ -n "$first_event_time" ] && [ -n "$last_event_time" ]; then local first_epoch=$(date -d "$first_event_time" +%s 2>/dev/null || echo 0) local last_epoch=$(date -d "$last_event_time" +%s 2>/dev/null || echo 0) local start_epoch=$(date -d "$start_time" +%s) local stop_epoch=$(date -d "$stop_time" +%s) if [ $last_epoch -ge $start_epoch ] && [ $first_epoch -le $stop_epoch ]; then log "处理二进制日志: $BINLOG" mysqlbinlog --start-datetime="$start_time" --stop-datetime="$stop_time" $BINLOG >> $output_file fi fi done}perform_pitr_recovery() { local base_backup=$1 log "执行时间点恢复" # 准备恢复环境 systemctl stop mysql mv $MYSQL_DATA_DIR ${MYSQL_DATA_DIR}_backup_$(date +%Y%m%d_%H%M%S) # 恢复基础备份 xtrabackup --copy-back --target-dir=$base_backup chown -R mysql:mysql $MYSQL_DATA_DIR # 启动MySQL到恢复模式 systemctl start mysql # 获取备份时间 local backup_time=$(stat -c %y $base_backup/xtrabackup_info | cut -d' ' -f1,2 | cut -d'.' -f1) # 提取和应用二进制日志 local binlog_events="/tmp/binlog_events.sql" echo "" > $binlog_events extract_binlog_events "$backup_time" "$RESTORE_TIME" $binlog_events # 应用二进制日志事件 if [ -s $binlog_events ]; then log "应用二进制日志事件" mysql -u root -p'password' < $binlog_events else log "没有需要应用的二进制日志事件" fi log "时间点恢复完成"}# 主逻辑BASE_BACKUP=$(find_relevant_backup)perform_pitr_recovery $BASE_BACKUP误操作数据恢复方案
Flashback工具使用:
-- 安装mysqlbinlog_flashback工具-- 使用my2sql或binlog2sql进行闪回-- 示例:恢复误删除的数据# 使用binlog2sql解析二进制日志python binlog2sql/binlog2sql.py -h127.0.0.1 -P3306 -uroot -p'password' -dcompany -temployees --start-file='mysql-bin.000001' --start-pos=4 --stop-pos=1000 -B-- 输出闪回SQL/*INSERT INTO `company`.`employees`(`create_time`, `phone`, `name`, `id`, `email`) VALUES ('2023-01-01 10:00:00', '13800138000', '张三', 1, 'zhangsan@company.com'); INSERT INTO `company`.`employees`(`create_time`, `phone`, `name`, `id`, `email`) VALUES ('2023-01-02 11:00:00', '13900139000', '李四', 2, 'lisi@company.com');*/基于备份的误操作恢复:
#!/bin/bash# point_in_time_restore.shDB_NAME="company"TABLE_NAME="employees"BACKUP_DIR="/backup"RESTORE_TIME="2023-12-01 10:00:00" # 误操作之前的时间LOG_FILE="/var/log/mysql/point_restore.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}create_restore_database() { local restore_db="${DB_NAME}_restore_$(date +%Y%m%d_%H%M%S)" log "创建恢复数据库: $restore_db" mysql -u root -p'password' -e "CREATE DATABASE $restore_db;" echo $restore_db}restore_table_to_point() { local restore_db=$1 local backup_file=$(find $BACKUP_DIR -name "*${DB_NAME}*" -type f | sort -r | head -1) if [ -z "$backup_file" ]; then log "错误: 未找到备份文件" exit 1 fi log "从备份恢复表结构" # 提取表结构 if [[ $backup_file == *.sql.gz ]]; then gunzip -c $backup_file | sed -n "/^-- Table structure for table \`$TABLE_NAME\`/,/^-- Table structure/p" | \ mysql -u root -p'password' $restore_db else sed -n "/^-- Table structure for table \`$TABLE_NAME\`/,/^-- Table structure/p" $backup_file | \ mysql -u root -p'password' $restore_db fi # 应用二进制日志到指定时间点 log "应用二进制日志到时间点: $RESTORE_TIME" local binlog_events="/tmp/binlog_events_$restore_db.sql" mysqlbinlog --database=$DB_NAME --stop-datetime="$RESTORE_TIME" /var/lib/mysql/mysql-bin.* | \ sed -n "/^### INSERT INTO \`$DB_NAME\`.\`$TABLE_NAME\`/,/^### INSERT INTO/p" | \ sed 's/^### //' > $binlog_events mysql -u root -p'password' $restore_db < $binlog_events rm $binlog_events log "表恢复完成: $restore_db.$TABLE_NAME"}compare_and_restore() { local restore_db=$1 log "比较并恢复数据" # 生成恢复SQL local restore_sql="/tmp/restore_data.sql" cat > $restore_sql << EOF-- 插入缺失的记录INSERT INTO $DB_NAME.$TABLE_NAME SELECT * FROM $restore_db.$TABLE_NAME rWHERE NOT EXISTS ( SELECT 1 FROM $DB_NAME.$TABLE_NAME c WHERE c.id = r.id);-- 更新被修改的记录UPDATE $DB_NAME.$TABLE_NAME cJOIN $restore_db.$TABLE_NAME r ON c.id = r.idSET c.name = r.name, c.email = r.email, c.phone = r.phone, c.updated_at = NOW()WHERE c.name != r.name OR c.email != r.email OR c.phone != r.phone;EOF mysql -u root -p'password' < $restore_sql rm $restore_sql log "数据恢复完成"}# 主逻辑RESTORE_DB=$(create_restore_database)restore_table_to_point $RESTORE_DBcompare_and_restore $RESTORE_DB# 清理恢复数据库mysql -u root -p'password' -e "DROP DATABASE $RESTORE_DB;"主从切换与数据重建
计划内主从切换:
#!/bin/bash# planned_failover.shCURRENT_MASTER="192.168.1.100"NEW_MASTER="192.168.1.101"MYSQL_USER="repl_user"MYSQL_PASSWORD="repl_password"LOG_FILE="/var/log/mysql/planned_failover.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}check_replication_health() { log "检查复制健康状况" # 检查主库 local master_status=$(mysql -h $CURRENT_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW MASTER STATUS\G") if [ $? -ne 0 ]; then log "错误: 无法连接主库 $CURRENT_MASTER" exit 1 fi # 检查从库延迟 local slave_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW SLAVE STATUS\G") local seconds_behind=$(echo "$slave_status" | grep "Seconds_Behind_Master" | awk '{print $2}') if [ "$seconds_behind" != "0" ]; then log "警告: 从库有延迟 ($seconds_behind 秒)" read -p "是否继续? (y/n): " -n 1 -r echo if [[ ! $REPLY =~ ^[Yy]$ ]]; then exit 1 fi fi log "复制健康状况良好"}perform_failover() { log "开始主从切换" # 1. 设置原主库为只读 log "设置原主库为只读模式" mysql -h $CURRENT_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SET GLOBAL read_only = ON;" # 2. 等待从库应用所有日志 log "等待从库应用所有日志" while true; do local slave_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW SLAVE STATUS\G") local seconds_behind=$(echo "$slave_status" | grep "Seconds_Behind_Master" | awk '{print $2}') local io_running=$(echo "$slave_status" | grep "Slave_IO_Running" | awk '{print $2}') local sql_running=$(echo "$slave_status" | grep "Slave_SQL_Running" | awk '{print $2}') if [ "$seconds_behind" = "0" ] && [ "$io_running" = "Yes" ] && [ "$sql_running" = "Yes" ]; then break fi sleep 1 done # 3. 停止从库复制 log "停止新主库的复制" mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "STOP SLAVE;" # 4. 记录新主库的二进制日志位置 local new_master_status=$(mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SHOW MASTER STATUS\G") local new_master_file=$(echo "$new_master_status" | grep "File" | awk '{print $2}') local new_master_position=$(echo "$new_master_status" | grep "Position" | awk '{print $2}') # 5. 设置新主库为可写 log "设置新主库为可写模式" mysql -h $NEW_MASTER -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SET GLOBAL read_only = OFF;" # 6. 配置其他从库指向新主库 log "重新配置其他从库" # 这里可以添加其他从库的重新配置逻辑 log "主从切换完成" log "新主库二进制日志位置: $new_master_file $new_master_position"}# 主逻辑check_replication_healthperform_failover灾难恢复演练
完整灾难恢复演练:
#!/bin/bash# disaster_recovery_test.shDR_SITE_MYSQL="192.168.2.100"BACKUP_SERVER="192.168.3.100"MYSQL_USER="dr_user"MYSQL_PASSWORD="dr_password"LOG_FILE="/var/log/mysql/dr_test.log"log() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> $LOG_FILE}verify_dr_environment() { log "验证灾备环境" # 检查网络连通性 if ! ping -c 3 $DR_SITE_MYSQL > /dev/null 2>&1; then log "错误: 无法连接到灾备MySQL服务器" return 1 fi # 检查MySQL服务 if ! mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "SELECT 1;" > /dev/null 2>&1; then log "错误: 灾备MySQL服务不可用" return 1 fi log "灾备环境验证通过" return 0}restore_to_dr_site() { log "开始恢复到灾备站点" # 1. 停止灾备站点MySQL服务 log "停止灾备站点MySQL服务" ssh root@$DR_SITE_MYSQL "systemctl stop mysql" # 2. 备份当前数据 log "备份灾备站点当前数据" ssh root@$DR_SITE_MYSQL "mv /var/lib/mysql /var/lib/mysql_backup_$(date +%Y%m%d_%H%M%S)" # 3. 从备份服务器获取最新备份 log "获取最新备份" local latest_backup=$(ssh root@$BACKUP_SERVER "ls -t /backup/full_* | head -1") if [ -z "$latest_backup" ]; then log "错误: 未找到备份文件" return 1 fi # 4. 传输备份到灾备站点 log "传输备份文件" scp -r root@$BACKUP_SERVER:$latest_backup /tmp/dr_restore/ # 5. 准备备份 log "准备备份" ssh root@$DR_SITE_MYSQL "xtrabackup --prepare --target-dir=/tmp/dr_restore/" # 6. 恢复备份 log "恢复备份" ssh root@$DR_SITE_MYSQL "xtrabackup --copy-back --target-dir=/tmp/dr_restore/" # 7. 设置权限并启动服务 log "启动MySQL服务" ssh root@$DR_SITE_MYSQL "chown -R mysql:mysql /var/lib/mysql && systemctl start mysql" log "灾备恢复完成"}verify_dr_data() { log "验证灾备数据" # 检查数据库列表 local db_count=$(mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -N -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema NOT IN ('mysql','information_schema','performance_schema','sys');") if [ "$db_count" -gt 0 ]; then log "数据验证成功: 发现 $db_count 个用户表" return 0 else log "数据验证失败: 未发现用户表" return 1 fi}perform_failover_test() { log "执行故障切换测试" # 模拟应用连接灾备数据库 local test_result=$(mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "CREATE DATABASE dr_test; USE dr_test; CREATE TABLE test_table (id INT); INSERT INTO test_table VALUES (1); SELECT * FROM test_table;" 2>&1) if echo "$test_result" | grep -q "1"; then log "故障切换测试成功" # 清理测试数据 mysql -h $DR_SITE_MYSQL -u $MYSQL_USER -p$MYSQL_PASSWORD -e "DROP DATABASE dr_test;" return 0 else log "故障切换测试失败" return 1 fi}# 主逻辑if verify_dr_environment; then restore_to_dr_site if verify_dr_data; then perform_failover_test fifi备份恢复监控告警
备份状态监控:
-- 创建备份监控表CREATE TABLE backup_monitor ( id BIGINT AUTO_INCREMENT PRIMARY KEY, backup_type ENUM('FULL', 'INCREMENTAL', 'BINLOG') NOT NULL, backup_file VARCHAR(500) NOT NULL, backup_size BIGINT, start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP NULL, status ENUM('RUNNING', 'COMPLETED', 'FAILED') DEFAULT 'RUNNING', error_message TEXT, checksum VARCHAR(64));-- 创建备份告警表CREATE TABLE backup_alerts ( id BIGINT AUTO_INCREMENT PRIMARY KEY, alert_type VARCHAR(50) NOT NULL, alert_message TEXT NOT NULL, severity ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, resolved_at TIMESTAMP NULL, resolved_by VARCHAR(100));-- 备份状态检查存储过程DELIMITER //CREATE PROCEDURE CheckBackupStatus()BEGIN DECLARE last_full_backup TIMESTAMP; DECLARE backup_age_hours INT; DECLARE failed_backups INT; -- 检查最近完整备份的时间 SELECT MAX(start_time) INTO last_full_backup FROM backup_monitor WHERE backup_type = 'FULL' AND status = 'COMPLETED'; SET backup_age_hours = TIMESTAMPDIFF(HOUR, last_full_backup, NOW()); -- 如果超过24小时没有完整备份,发出告警 IF backup_age_hours > 24 THEN INSERT INTO backup_alerts (alert_type, alert_message, severity) VALUES ('BACKUP_MISSING', CONCAT('超过', backup_age_hours, '小时没有完整备份'), 'HIGH'); END IF; -- 检查失败的备份 SELECT COUNT(*) INTO failed_backups FROM backup_monitor WHERE status = 'FAILED' AND start_time > NOW() - INTERVAL 24 HOUR; IF failed_backups > 0 THEN INSERT INTO backup_alerts (alert_type, alert_message, severity) VALUES ('BACKUP_FAILED', CONCAT('过去24小时有', failed_backups, '个备份失败'), 'HIGH'); END IF; END //DELIMITER ;3. 安全与权限管理
用户权限体系设计
最小权限原则实施:
-- 创建应用用户(遵循最小权限原则)CREATE USER 'app_readonly'@'192.168.1.%' IDENTIFIED BY 'secure_password_123';GRANT SELECT ON company.* TO 'app_readonly'@'192.168.1.%';CREATE USER 'app_readwrite'@'192.168.1.%' IDENTIFIED BY 'secure_password_456';GRANT SELECT, INSERT, UPDATE, DELETE ON company.* TO 'app_readwrite'@'192.168.1.%';CREATE USER 'app_report'@'192.168.1.%' IDENTIFIED BY 'secure_password_789';GRANT SELECT ON company.employees TO 'app_report'@'192.168.1.%';GRANT SELECT ON company.departments TO 'app_report'@'192.168.1.%';-- 创建管理用户CREATE USER 'db_admin'@'localhost' IDENTIFIED BY 'admin_secure_password';GRANT ALL PRIVILEGES ON *.* TO 'db_admin'@'localhost' WITH GRANT OPTION;-- 创建备份用户CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'backup_secure_password';GRANT SELECT, RELOAD, PROCESS, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'backup_user'@'localhost';-- 查看用户权限SHOW GRANTS FOR 'app_readonly'@'192.168.1.%';数据库权限审计:
-- 创建权限审计表CREATE TABLE privilege_audit ( id BIGINT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(100) NOT NULL, host_pattern VARCHAR(100) NOT NULL, database_name VARCHAR(100), table_name VARCHAR(100), privilege_type VARCHAR(50) NOT NULL, granted_by VARCHAR(100), granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_revoked BOOLEAN DEFAULT FALSE, revoked_at TIMESTAMP NULL, revoked_by VARCHAR(100));-- 权限审计存储过程DELIMITER //CREATE PROCEDURE AuditUserPrivileges()BEGIN DECLARE done INT DEFAULT 0; DECLARE v_user, v_host, v_db, v_table, v_privilege VARCHAR(100); DECLARE cur CURSOR FOR SELECT User, Host, Db, Table_name, Privilege FROM information_schema.table_privileges; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; OPEN cur; read_loop: LOOP FETCH cur INTO v_user, v_host, v_db, v_table, v_privilege; IF done THEN LEAVE read_loop; END IF; -- 检查权限是否已经记录 IF NOT EXISTS ( SELECT 1 FROM privilege_audit WHERE username = v_user AND host_pattern = v_host AND database_name = v_db AND table_name = v_table AND privilege_type = v_privilege AND is_revoked = FALSE ) THEN -- 记录新权限 INSERT INTO privilege_audit (username, host_pattern, database_name, table_name, privilege_type) VALUES (v_user, v_host, v_db, v_table, v_privilege); END IF; END LOOP; CLOSE cur; -- 标记已撤销的权限 UPDATE privilege_audit pa LEFT JOIN information_schema.table_privileges tp ON pa.username = tp.User AND pa.host_pattern = tp.Host AND pa.database_name = tp.Db AND pa.table_name = tp.Table_name AND pa.privilege_type = tp.Privilege SET pa.is_revoked = TRUE, pa.revoked_at = CURRENT_TIMESTAMP WHERE pa.is_revoked = FALSE AND tp.User IS NULL; END //DELIMITER ;角色管理与权限继承
MySQL 8.0角色管理:
-- 创建角色CREATE ROLE read_only_role;CREATE ROLE read_write_role;CREATE ROLE dba_role;-- 为角色分配权限GRANT SELECT ON company.* TO read_only_role;GRANT SELECT, INSERT, UPDATE, DELETE ON company.* TO read_write_role;GRANT ALL PRIVILEGES ON *.* TO dba_role;-- 创建用户并分配角色CREATE USER 'report_user'@'%' IDENTIFIED BY 'report_password';CREATE USER 'app_user'@'%' IDENTIFIED BY 'app_password';CREATE USER 'admin_user'@'localhost' IDENTIFIED BY 'admin_password';-- 分配角色给用户GRANT read_only_role TO 'report_user'@'%';GRANT read_write_role TO 'app_user'@'%';GRANT dba_role TO 'admin_user'@'localhost';-- 设置默认角色SET DEFAULT ROLE read_only_role TO 'report_user'@'%';SET DEFAULT ROLE read_write_role TO 'app_user'@'%';SET DEFAULT ROLE dba_role TO 'admin_user'@'localhost';-- 激活角色SET ROLE ALL;-- 查看角色权限SHOW GRANTS FOR 'report_user'@'%' USING read_only_role;-- 创建层次化角色CREATE ROLE junior_dba;CREATE ROLE senior_dba;GRANT junior_dba TO senior_dba;GRANT SELECT, INSERT, UPDATE, DELETE ON mysql.* TO junior_dba;GRANT ALL PRIVILEGES ON *.* TO senior_dba;动态权限管理:
-- 创建存储过程管理用户权限DELIMITER //CREATE PROCEDURE ManageUserAccess( IN p_username VARCHAR(100), IN p_host_pattern VARCHAR(100), IN p_database_name VARCHAR(100), IN p_action ENUM('GRANT_READ', 'GRANT_WRITE', 'REVOKE_ACCESS'))BEGIN DECLARE user_exists INT; -- 检查用户是否存在 SELECT COUNT(*) INTO user_exists FROM mysql.user WHERE User = p_username AND Host = p_host_pattern; IF user_exists = 0 THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '用户不存在'; END IF; CASE p_action WHEN 'GRANT_READ' THEN SET @grant_sql = CONCAT('GRANT SELECT ON ', p_database_name, '.* TO ''', p_username, '''@''', p_host_pattern, ''''); PREPARE stmt FROM @grant_sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- 记录权限变更 INSERT INTO privilege_audit (username, host_pattern, database_name, privilege_type) VALUES (p_username, p_host_pattern, p_database_name, 'SELECT'); WHEN 'GRANT_WRITE' THEN SET @grant_sql = CONCAT('GRANT SELECT, INSERT, UPDATE, DELETE ON ', p_database_name, '.* TO ''', p_username, '''@''', p_host_pattern, ''''); PREPARE stmt FROM @grant_sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; INSERT INTO privilege_audit (username, host_pattern, database_name, privilege_type) VALUES (p_username, p_host_pattern, p_database_name, 'READ_WRITE'); WHEN 'REVOKE_ACCESS' THEN SET @revoke_sql = CONCAT('REVOKE ALL PRIVILEGES ON ', p_database_name, '.* FROM ''', p_username, '''@''', p_host_pattern, ''''); PREPARE stmt FROM @revoke_sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; UPDATE privilege_audit SET is_revoked = TRUE, revoked_at = NOW() WHERE username = p_username AND host_pattern = p_host_pattern AND database_name = p_database_name AND is_revoked = FALSE; END CASE; END //DELIMITER ;数据加密:透明加密与列加密
InnoDB表空间加密:
-- 安装密钥环组件(MySQL 8.0)INSTALL COMPONENT "file://component_keyring_file";SET GLOBAL keyring_file_data = '/var/lib/mysql-keyring/keyring';-- 创建加密表空间CREATE TABLESPACE encrypted_ts ADD DATAFILE 'encrypted_ts.ibd' ENGINE=InnoDBENCRYPTION='Y';-- 在加密表空间中创建表CREATE TABLE sensitive_data ( id INT PRIMARY KEY, secret_data VARCHAR(500)) TABLESPACE encrypted_ts;-- 加密现有表ALTER TABLE existing_sensitive_table ENCRYPTION='Y';-- 查看加密状态SELECT TABLE_SCHEMA, TABLE_NAME, CREATE_OPTIONSFROM information_schema.TABLES WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';列级加密:
-- 创建加密函数DELIMITER //CREATE FUNCTION aes_encrypt(data TEXT, key_str VARCHAR(255))RETURNS VARBINARY(500)DETERMINISTICBEGIN RETURN AES_ENCRYPT(data, key_str);END //CREATE FUNCTION aes_decrypt(encrypted_data VARBINARY(500), key_str VARCHAR(255))RETURNS TEXTDETERMINISTICBEGIN RETURN AES_DECRYPT(encrypted_data, key_str);END //DELIMITER ;-- 创建存储加密数据的表CREATE TABLE user_secrets ( user_id INT PRIMARY KEY, -- 加密存储的敏感数据 ssn VARBINARY(500), credit_card VARBINARY(500), medical_info VARBINARY(500), -- 加密密钥(在实际应用中应该安全存储) encryption_key VARCHAR(255) DEFAULT 'default_encryption_key');-- 插入加密数据INSERT INTO user_secrets (user_id, ssn, credit_card)VALUES ( 1, aes_encrypt('123-45-6789', 'user1_key'), aes_encrypt('4111111111111111', 'user1_key'));-- 查询解密数据SELECT user_id, aes_decrypt(ssn, 'user1_key') as decrypted_ssn, aes_decrypt(credit_card, 'user1_key') as decrypted_credit_cardFROM user_secrets WHERE user_id = 1;密钥管理策略:
-- 创建密钥管理表CREATE TABLE encryption_keys ( key_id VARCHAR(100) PRIMARY KEY, key_value VARBINARY(500) NOT NULL, key_type ENUM('COLUMN', 'TABLE', 'BACKUP') NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_by VARCHAR(100), is_active BOOLEAN DEFAULT TRUE, rotated_at TIMESTAMP NULL);-- 密钥轮换存储过程DELIMITER //CREATE PROCEDURE RotateEncryptionKey( IN p_key_id VARCHAR(100), IN p_new_key_value VARBINARY(500))BEGIN DECLARE old_key_value VARBINARY(500); DECLARE done INT DEFAULT 0; DECLARE v_user_id INT; DECLARE v_ssn, v_credit_card VARBINARY(500); -- 获取旧密钥 SELECT key_value INTO old_key_value FROM encryption_keys WHERE key_id = p_key_id AND is_active = TRUE; IF old_key_value IS NULL THEN SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '未找到活动的密钥'; END IF; -- 使用游标处理所有需要重新加密的数据 DECLARE cur CURSOR FOR SELECT user_id, ssn, credit_card FROM user_secrets; DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1; OPEN cur; read_loop: LOOP FETCH cur INTO v_user_id, v_ssn, v_credit_card; IF done THEN LEAVE read_loop; END IF; -- 解密并使用新密钥重新加密 UPDATE user_secrets SET ssn = aes_encrypt(aes_decrypt(v_ssn, old_key_value), p_new_key_value), credit_card = aes_encrypt(aes_decrypt(v_credit_card, old_key_value), p_new_key_value) WHERE user_id = v_user_id; END LOOP; CLOSE cur; -- 停用旧密钥,激活新密钥 UPDATE encryption_keys SET is_active = FALSE, rotated_at = NOW() WHERE key_id = p_key_id; INSERT INTO encryption_keys (key_id, key_value, key_type) VALUES (p_key_id, p_new_key_value, 'COLUMN'); END //DELIMITER ;审计日志与安全监控
MySQL企业版审计:
-- 安装审计插件(企业版)INSTALL PLUGIN audit_log SONAME 'audit_log.so';-- 配置审计日志(在my.cnf中)/*[mysqld]audit_log_format=JSONaudit_log_file=/var/log/mysql/audit.logaudit_log_policy=ALLaudit_log_rotate_on_size=100000000audit_log_rotations=5*/-- 查看审计日志状态SHOW VARIABLES LIKE 'audit_log%';-- 查询审计日志SELECT JSON_EXTRACT(audit_record, '$.timestamp') as timestamp, JSON_EXTRACT(audit_record, '$.class') as event_class, JSON_EXTRACT(audit_record, '$.event') as event_type, JSON_EXTRACT(audit_record, '$.connection_id') as connection_id, JSON_EXTRACT(audit_record, '$.user') as user, JSON_EXTRACT(audit_record, '$.query') as queryFROM mysql.audit_log WHERE JSON_EXTRACT(audit_record, '$.query') IS NOT NULLORDER BY timestamp DESC LIMIT 10;社区版审计方案:
-- 使用通用日志实现基础审计SET GLOBAL general_log = 1;SET GLOBAL log_output = 'TABLE';-- 创建自定义审计表CREATE TABLE custom_audit_log ( id BIGINT AUTO_INCREMENT PRIMARY KEY, event_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, user_host VARCHAR(200) NOT NULL, thread_id BIGINT NOT NULL, server_id INT NOT NULL, command_type VARCHAR(64) NOT NULL, argument TEXT NOT NULL, client_ip VARCHAR(45), database_name VARCHAR(100), execution_time DECIMAL(10,6), rows_affected INT);-- 审计触发器示例DELIMITER //CREATE TRIGGER audit_user_changesAFTER INSERT ON mysql.userFOR EACH ROWBEGIN INSERT INTO custom_audit_log (user_host, thread_id, server_id, command_type, argument, client_ip) VALUES (USER(), CONNECTION_ID(), @@server_id, 'CREATE_USER', CONCAT('Created user: ', NEW.User, '@', NEW.Host), SUBSTRING_INDEX(USER(), '@', -1));END //CREATE TRIGGER audit_privilege_changesAFTER INSERT ON mysql.dbFOR EACH ROWBEGIN INSERT INTO custom_audit_log (user_host, thread_id, server_id, command_type, argument, database_name) VALUES (USER(), CONNECTION_ID(), @@server_id, 'GRANT_PRIVILEGE', CONCAT('Granted privileges on ', NEW.Db, ' to ', NEW.User), NEW.Db);END //DELIMITER ;安全监控仪表板:
-- 创建安全监控视图CREATE VIEW security_dashboard ASSELECT 'Failed Logins' as metric_name, COUNT(*) as metric_value, MAX(event_time) as last_occurrenceFROM custom_audit_logWHERE argument LIKE '%Access denied%' AND event_time > NOW() - INTERVAL 1 HOURUNION ALLSELECT 'New Users Created' as metric_name, COUNT(*) as metric_value, MAX(event_time) as last_occurrenceFROM custom_audit_logWHERE command_type = 'CREATE_USER' AND event_time > NOW() - INTERVAL 24 HOURUNION ALLSELECT 'Privilege Changes' as metric_name, COUNT(*) as metric_value, MAX(event_time) as last_occurrenceFROM custom_audit_logWHERE command_type IN ('GRANT_PRIVILEGE', 'REVOKE_PRIVILEGE') AND event_time > NOW() - INTERVAL 24 HOURUNION ALLSELECT 'Sensitive Data Access' as metric_name, COUNT(*) as metric_value, MAX(event_time) as last_occurrenceFROM custom_audit_logWHERE argument LIKE '%user_secrets%' AND event_time > NOW() - INTERVAL 1 HOUR;SQL注入防护与安全开发
预处理语句使用:
-- 不安全的查询(容易SQL注入)SET @user_input = "1'; DROP TABLE users; --";SET @sql = CONCAT("SELECT * FROM users WHERE id = '", @user_input, "'");PREPARE stmt FROM @sql;EXECUTE stmt;-- 安全的预处理语句PREPARE safe_stmt FROM "SELECT * FROM users WHERE id = ?";SET @user_id = "1";EXECUTE safe_stmt USING @user_id;-- 存储过程参数化查询DELIMITER //CREATE PROCEDURE GetUserByEmail(IN p_email VARCHAR(255))BEGIN -- 直接使用参数,避免拼接 SELECT * FROM users WHERE email = p_email;END //DELIMITER ;输入验证函数:
DELIMITER //CREATE FUNCTION ValidateEmail(email VARCHAR(255))RETURNS BOOLEANDETERMINISTICBEGIN -- 简单的邮箱格式验证 IF email REGEXP '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,4}$' THEN RETURN TRUE; ELSE RETURN FALSE; END IF;END //CREATE FUNCTION SanitizeInput(input_text TEXT)RETURNS TEXTDETERMINISTICBEGIN -- 移除潜在的SQL注入字符 SET input_text = REPLACE(input_text, "'", "''"); SET input_text = REPLACE(input_text, ";", ""); SET input_text = REPLACE(input_text, "--", ""); SET input_text = REPLACE(input_text, "/*", ""); SET input_text = REPLACE(input_text, "*/", ""); RETURN input_text;END //DELIMITER ;安全开发规范检查:
-- 检查存储过程的安全问题SELECT ROUTINE_NAME, ROUTINE_DEFINITIONFROM information_schema.ROUTINESWHERE ROUTINE_DEFINITION LIKE '%CONCAT(%' OR ROUTINE_DEFINITION LIKE '%EXECUTE%IMMEDIATE%' OR ROUTINE_DEFINITION LIKE '%PREPARE%' OR ROUTINE_DEFINITION LIKE '%sp_executesql%';-- 查找可能包含动态SQL的代码SELECT TABLE_NAME, COLUMN_NAMEFROM information_schema.COLUMNSWHERE TABLE_SCHEMA = 'your_database' AND (COLUMN_NAME LIKE '%sql%' OR COLUMN_NAME LIKE '%query%') AND TABLE_NAME NOT LIKE '%audit%';总结
通过本篇的深入学习,我们掌握了MySQL备份恢复和安全管理的完整体系:
- 备份策略:逻辑备份、物理备份、增量备份的实战应用
- 恢复技术:时间点恢复、误操作恢复、灾难恢复的完整流程
- 安全管理:权限体系、数据加密、审计监控的全面方案
- 安全开发:SQL注入防护、输入验证的安全编码实践
关键安全原则:
- 最小权限:用户只拥有完成工作所需的最小权限
- 纵深防御:多层安全措施,避免单点失效
- 定期审计:持续监控和审查安全状态
- 应急准备:完善的备份和恢复预案
备份恢复最佳实践:
- 3-2-1规则:3个副本,2种介质,1个离线存储
- 定期恢复演练:确保备份可用性
- 监控备份状态:及时发现问题
- 加密敏感数据:保护数据隐私
在下一篇中,我们将探讨MySQL在云原生环境中的应用,包括容器化部署、微服务架构集成等现代技术。
动手练习:
- 设计并实施完整的备份策略,包括完整备份和增量备份
- 执行时间点恢复演练,验证备份的可用性
- 建立权限管理体系,实施最小权限原则
- 配置数据加密和审计日志,增强安全性
- 进行安全代码审查,修复潜在的SQL注入漏洞
欢迎在评论区分享你的备份恢复实践和安全加固经验!


