a-cloud-all/.devops/monitor.py

699 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git 仓库监听器
监听多个 Git 仓库的指定分支,检测到新提交时触发部署
"""
import os
import sys
import time
import yaml
import subprocess
import socket
from datetime import datetime
from pathlib import Path
# 添加当前目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 导入自定义模块
from scripts.log import Logger
from scripts import docker, maven, npm
from scripts.init import mysql, redis, nacos
from scripts.dingtalk import DingTalkNotifier
class GitMonitor:
"""Git 仓库监听器"""
def __init__(self, config_path='.devops/config.yaml'):
"""初始化监听器"""
self.config_path = config_path
self.config = None
self.last_commits = {}
self.last_tags = {} # 记录每个仓库的最新 tag
self.global_branch = 'main'
self.project_root = None
self.runtime_path = None
self.dingtalk_notifier = None
self.watch_tags = False
self.tag_pattern = "v*"
# 初始化
self._print_startup_banner()
self._load_config()
self._init_paths()
self._init_dingtalk()
def _print_startup_banner(self):
"""打印启动横幅"""
print("\n")
Logger.separator()
print(" RuoYi Cloud DevOps 自动化部署系统")
Logger.separator()
print(f"启动时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
Logger.separator()
print("\n")
def _load_config(self):
"""加载配置文件"""
Logger.info(f"[步骤 1/3] 读取配置文件: {self.config_path}")
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
self.global_branch = self.config.get('global_branch', 'main')
# 加载 tag 监听配置
monitor_config = self.config.get('monitor', {})
self.watch_tags = monitor_config.get('watch_tags', False)
self.tag_pattern = monitor_config.get('tag_pattern', 'v*')
# 初始化日志配置
log_config = self.config.get('logging', {})
log_file = log_config.get('file', '.devops/logs/devops.log')
max_size = log_config.get('max_size', 10485760)
Logger.init(log_file=log_file, max_size=max_size)
Logger.info(f"✓ 配置加载成功 - 全局分支: {self.global_branch}")
Logger.info(f"✓ Tag 监听: {'已启用' if self.watch_tags else '未启用'} (模式: {self.tag_pattern})")
Logger.info(f"✓ 日志配置 - 文件: {log_file}, 最大大小: {max_size} 字节")
except Exception as e:
Logger.error(f"配置加载失败: {e}")
sys.exit(1)
def _init_paths(self):
"""初始化路径"""
Logger.info("[步骤 2/3] 初始化路径")
try:
self.project_root = Path(__file__).parent.parent.resolve()
runtime_path = self.config['main_repository']['runtime_path']
if not Path(runtime_path).is_absolute():
self.runtime_path = self.project_root / runtime_path
else:
self.runtime_path = Path(runtime_path)
Logger.info(f"✓ 路径初始化成功")
Logger.info(f" 项目根目录: {self.project_root}")
Logger.info(f" Runtime 目录: {self.runtime_path}")
except Exception as e:
Logger.error(f"路径初始化失败: {e}")
sys.exit(1)
def _init_dingtalk(self):
"""初始化钉钉通知器"""
try:
dingtalk_config = self.config.get('dingtalk', {})
if dingtalk_config.get('enabled', False):
access_token = dingtalk_config.get('access_token')
secret = dingtalk_config.get('secret')
if access_token:
self.dingtalk_notifier = DingTalkNotifier(access_token, secret)
Logger.info("✓ 钉钉通知已启用")
else:
Logger.warning("钉钉通知已启用但未配置 access_token")
else:
Logger.info("钉钉通知未启用")
except Exception as e:
Logger.warning(f"钉钉通知初始化失败: {e}")
def get_server_ip(self):
"""获取服务器IP地址"""
try:
# 创建一个UDP socket连接到外部地址来获取本机IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
try:
# 备用方案获取主机名对应的IP
return socket.gethostbyname(socket.gethostname())
except Exception:
return "unknown"
def get_remote_commit(self, repo_url, branch):
"""获取远程仓库的最新提交 hash"""
try:
cmd = f"git ls-remote {repo_url} refs/heads/{branch}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout:
return result.stdout.split()[0]
return None
except Exception as e:
Logger.error(f"获取远程提交失败 {repo_url}: {e}")
return None
def get_commit_message(self, repo_url, commit_hash):
"""获取指定 commit 的提交消息"""
try:
cmd = f"git ls-remote --heads {repo_url} | grep {commit_hash[:8]}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=10
)
# 由于 ls-remote 无法获取 commit message我们返回简短的 hash
return f"提交 {commit_hash[:8]}"
except Exception as e:
Logger.error(f"获取提交消息失败: {e}")
return f"提交 {commit_hash[:8]}"
def get_remote_tags(self, repo_url):
"""获取远程仓库的所有 tags"""
try:
cmd = f"git ls-remote --tags {repo_url}"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout:
tags = {}
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split()
if len(parts) >= 2:
commit_hash = parts[0]
ref = parts[1]
# 提取 tag 名称,去掉 refs/tags/ 前缀和 ^{} 后缀
if ref.startswith('refs/tags/'):
tag_name = ref.replace('refs/tags/', '')
if not tag_name.endswith('^{}'):
tags[tag_name] = commit_hash
return tags
return {}
except Exception as e:
Logger.error(f"获取远程 tags 失败 {repo_url}: {e}")
return {}
def check_repository(self, repo_config):
"""检查单个仓库是否有新提交"""
repo_name = repo_config['name']
repo_url = repo_config['url']
current_commit = self.get_remote_commit(repo_url, self.global_branch)
if not current_commit:
return False
last_commit = self.last_commits.get(repo_name)
if last_commit is None:
self.last_commits[repo_name] = current_commit
Logger.info(f"初始化 {repo_name} 提交记录: {current_commit[:8]}")
return False
if current_commit != last_commit:
Logger.info(f"检测到 {repo_name} 新提交: {last_commit[:8]} -> {current_commit[:8]}")
self.last_commits[repo_name] = current_commit
return True
return False
def check_repository_tags(self, repo_config):
"""检查单个仓库是否有新 tag"""
repo_name = repo_config['name']
repo_url = repo_config['url']
# 获取远程所有 tags
current_tags = self.get_remote_tags(repo_url)
if current_tags is None or (not current_tags and repo_name not in self.last_tags):
# 首次获取失败或获取失败时发送通知
error_msg = f"获取 {repo_name} 远程 tags 失败"
Logger.error(error_msg)
if self.dingtalk_notifier and repo_name in self.last_tags:
# 只有在之前成功获取过 tags 的情况下才发送通知,避免首次初始化时发送
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash='unknown',
error_msg=error_msg
)
return False, None
# 获取上次记录的 tags
last_tags = self.last_tags.get(repo_name, {})
# 找出新增的 tags
new_tags = []
for tag_name, commit_hash in current_tags.items():
# 检查 tag 是否匹配模式
import fnmatch
if fnmatch.fnmatch(tag_name, self.tag_pattern):
if tag_name not in last_tags:
new_tags.append((tag_name, commit_hash))
# 更新记录
self.last_tags[repo_name] = current_tags
if new_tags:
# 返回最新的 tag
new_tags.sort(reverse=True) # 按名称排序,最新的在前
latest_tag = new_tags[0]
Logger.info(f"检测到 {repo_name} 新 tag: {latest_tag[0]} ({latest_tag[1][:8]})")
return True, latest_tag
return False, None
def update_main_repo(self):
"""更新主仓库和所有子模块"""
repo_path = self.runtime_path / 'a-cloud-all'
main_repo_url = self.config['main_repository']['url']
Logger.separator()
Logger.info("更新主仓库和子模块")
Logger.separator()
# 检查主仓库是否存在
if not (repo_path / '.git').exists():
Logger.info("主仓库不存在,开始克隆...")
self.runtime_path.mkdir(parents=True, exist_ok=True)
cmd = f"git clone --recurse-submodules {main_repo_url} a-cloud-all"
result = subprocess.run(cmd, shell=True, cwd=self.runtime_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("克隆主仓库失败")
return False
Logger.info("主仓库克隆成功")
else:
Logger.info("主仓库已存在,更新代码...")
# 切换到主分支
cmd = f"git checkout {self.global_branch}"
subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True)
# 拉取最新代码
cmd = "git pull"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("拉取主仓库失败")
return False
# 初始化和更新所有子模块(包括新增的子模块)
cmd = "git submodule update --init --recursive"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("初始化子模块失败")
return False
# 更新所有子模块到最新代码
cmd = f"git submodule foreach 'git checkout {self.global_branch} && git pull'"
result = subprocess.run(cmd, shell=True, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
Logger.error("更新子模块失败")
return False
Logger.info("主仓库和子模块更新成功")
return True
def init_infrastructure(self):
"""初始化基础设施服务(动态读取配置)"""
repo_path = self.runtime_path / 'a-cloud-all'
# 从配置文件读取基础设施列表
infra_config = self.config.get('infrastructure', [])
for infra in infra_config:
service_name = infra['name']
docker_service = infra['docker_service']
wait_time = infra.get('wait_time', 10)
# 检查是否已初始化
flag_file = repo_path / '.devops' / f'.deployed_{service_name}'
if not flag_file.exists():
Logger.info(f"初始化 {service_name}...")
# 执行预部署命令(如果有)
pre_deploy_commands = infra.get('pre_deploy_commands', [])
if pre_deploy_commands:
Logger.info(f"执行 {service_name} 预部署命令...")
for cmd in pre_deploy_commands:
Logger.info(f"执行命令: {cmd}")
result = subprocess.run(
cmd,
shell=True,
cwd=repo_path,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"预部署命令执行失败: {result.stderr}")
return False
# 构建并启动服务
docker_dir = repo_path / 'docker'
# 构建镜像
Logger.info(f"构建 {service_name} 镜像...")
build_cmd = f"docker-compose build --no-cache {docker_service}"
result = subprocess.run(
build_cmd,
shell=True,
cwd=docker_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"{service_name} 镜像构建失败: {result.stderr}")
return False
Logger.info(f"{service_name} 镜像构建成功")
# 启动容器
Logger.info(f"启动 {service_name} 容器...")
up_cmd = f"docker-compose up -d {docker_service}"
result = subprocess.run(
up_cmd,
shell=True,
cwd=docker_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
Logger.error(f"{service_name} 容器启动失败: {result.stderr}")
return False
Logger.info(f"{service_name} 容器启动成功")
# 创建标记文件
flag_file.parent.mkdir(parents=True, exist_ok=True)
flag_file.touch()
# 等待服务启动
Logger.info(f"等待 {service_name} 启动({wait_time}秒)...")
time.sleep(wait_time)
return True
def deploy(self, repo_config, tag_name=None):
"""执行部署流程
参数:
repo_config: 仓库配置
tag_name: 可选的 tag 名称,如果提供则表示这是由 tag 触发的部署
"""
repo_path = self.runtime_path / 'a-cloud-all'
repo_name = repo_config['name']
commit_hash = self.last_commits.get(repo_name, 'unknown')
start_time = time.time()
Logger.separator()
Logger.info(f"开始部署: {repo_name}")
if tag_name:
Logger.info(f"触发方式: Tag ({tag_name})")
else:
Logger.info(f"触发方式: 分支提交")
Logger.separator()
try:
# 1. 更新主仓库和子模块
if not self.update_main_repo():
# 发送 Git 更新失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg="Git 仓库更新失败(主仓库或子模块)"
)
return False
# 获取子仓库的 commit message
commit_message = None
submodule_path = repo_path / repo_config['path']
if submodule_path.exists():
try:
cmd = f"git log -1 --pretty=format:'%s' {commit_hash}"
result = subprocess.run(
cmd, shell=True, cwd=submodule_path,
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout:
commit_message = result.stdout.strip()
Logger.info(f"提交消息: {commit_message}")
except Exception as e:
Logger.warning(f"获取提交消息失败: {e}")
# 获取服务器 IP
server_ip = self.get_server_ip()
# 发送构建开始通知(包含 commit message 和服务器 IP
if self.dingtalk_notifier:
# 如果是 tag 触发,在 commit_message 中添加 tag 信息
display_message = commit_message
if tag_name:
display_message = f"Tag: {tag_name}" + (f" - {commit_message}" if commit_message else "")
self.dingtalk_notifier.send_build_start(
repo_name=repo_name,
branch=self.global_branch if not tag_name else f"tag/{tag_name}",
commit_hash=commit_hash,
commit_message=display_message,
server_ip=server_ip
)
# 2. 初始化基础设施
if not self.init_infrastructure():
# 发送基础设施初始化失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg="基础设施初始化失败MySQL/Redis/Nacos等"
)
return False
# 3. 根据项目类型执行打包
if repo_config['type'] == 'java':
# Maven 打包
work_dir = repo_path
commands = ' && '.join(repo_config['build_commands'])
source_path = repo_config['path'] + '/' + repo_config['artifact_path']
target_dir = repo_path / repo_config['docker_path']
success, error_msg = maven.run_maven(work_dir, commands, source_path, target_dir)
if not success:
# 发送 Maven 构建失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg=f"Maven 打包失败: {error_msg}"
)
return False
elif repo_config['type'] == 'nodejs':
# NPM 打包
work_dir = repo_path / repo_config['path']
commands = ' && '.join(repo_config['build_commands'])
source_dir = repo_config['artifact_path']
target_dir = repo_path / repo_config['docker_path']
success, error_msg = npm.run_npm(work_dir, commands, source_dir, target_dir)
if not success:
# 发送 NPM/PNPM 构建失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg=f"NPM/PNPM 打包失败: {error_msg}"
)
return False
elif repo_config['type'] == 'python':
# Python 项目 - 直接复制源码到 docker 目录
Logger.separator()
Logger.info("开始 Python 项目部署")
Logger.separator()
source_path = repo_path / repo_config['path']
target_dir = repo_path / repo_config['docker_path']
Logger.info(f"源码目录: {source_path}")
Logger.info(f"目标目录: {target_dir}")
try:
# 清空目标目录(保留 .gitkeep 等隐藏文件)
if target_dir.exists():
import shutil
for item in target_dir.iterdir():
if not item.name.startswith('.'):
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
Logger.info("清空目标目录完成")
else:
target_dir.mkdir(parents=True, exist_ok=True)
Logger.info("创建目标目录完成")
# 复制源码到目标目录
import shutil
for item in source_path.iterdir():
if item.name in ['.git', '__pycache__', '.pytest_cache', '.venv', 'venv']:
continue # 跳过不需要的目录
target_item = target_dir / item.name
if item.is_dir():
if target_item.exists():
shutil.rmtree(target_item)
shutil.copytree(item, target_item)
else:
shutil.copy2(item, target_item)
Logger.info("源码复制完成")
except Exception as e:
error_msg = f"Python 项目源码复制失败: {str(e)}"
Logger.error(error_msg)
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg=error_msg
)
return False
# 4. Docker 部署
compose_dir = repo_path / 'docker'
service_name = repo_config['docker_service']
if not docker.run_docker_compose(compose_dir, service_name):
# 发送构建失败通知
if self.dingtalk_notifier:
duration = time.time() - start_time
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg="Docker 部署失败"
)
return False
# 计算构建耗时
duration = time.time() - start_time
# 发送构建成功通知
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_success(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
duration=duration
)
Logger.info(f"部署完成: {repo_config['name']}")
return True
except Exception as e:
# 计算构建耗时
duration = time.time() - start_time
# 发送构建失败通知
if self.dingtalk_notifier:
self.dingtalk_notifier.send_build_failure(
repo_name=repo_name,
branch=self.global_branch,
commit_hash=commit_hash,
error_msg=str(e),
at_all=True
)
Logger.error(f"部署异常: {e}")
return False
def run_once(self):
"""执行一次检查"""
Logger.info("[步骤 3/3] 开始监听分支变化")
repos = self.config.get('repositories', [])
for repo_config in repos:
try:
# 检查分支提交
if self.check_repository(repo_config):
Logger.info(f"触发部署: {repo_config['name']} (分支提交)")
if self.deploy(repo_config):
Logger.info(f"✓ 部署成功: {repo_config['name']}")
else:
Logger.error(f"✗ 部署失败: {repo_config['name']}")
continue # 已经部署,跳过 tag 检查
# 检查 tag如果启用
if self.watch_tags:
has_new_tag, tag_info = self.check_repository_tags(repo_config)
if has_new_tag and tag_info:
tag_name, commit_hash = tag_info
Logger.info(f"触发部署: {repo_config['name']} (新 tag: {tag_name})")
# 更新 last_commits 以便 deploy 方法使用
self.last_commits[repo_config['name']] = commit_hash
if self.deploy(repo_config, tag_name=tag_name):
Logger.info(f"✓ 部署成功: {repo_config['name']}")
else:
Logger.error(f"✗ 部署失败: {repo_config['name']}")
except Exception as e:
Logger.error(f"处理仓库异常 {repo_config['name']}: {e}")
# 发送异常通知
if self.dingtalk_notifier:
commit_hash = self.last_commits.get(repo_config['name'], 'unknown')
self.dingtalk_notifier.send_build_failure(
repo_name=repo_config['name'],
branch=self.global_branch,
commit_hash=commit_hash,
error_msg=f"处理仓库时发生异常: {str(e)}",
at_all=True
)
def run(self):
"""持续监听运行"""
poll_interval = self.config['monitor']['poll_interval']
Logger.info(f"开始持续监听,轮询间隔: {poll_interval}")
Logger.info("按 Ctrl+C 停止监听\n")
try:
while True:
self.run_once()
time.sleep(poll_interval)
except KeyboardInterrupt:
Logger.info("\n收到停止信号,退出监听")
except Exception as e:
Logger.error(f"监听异常: {e}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='Git 仓库监听器')
parser.add_argument('--config', default='.devops/config.yaml', help='配置文件路径')
parser.add_argument('--once', action='store_true', help='只执行一次检查')
args = parser.parse_args()
monitor = GitMonitor(args.config)
if args.once:
monitor.run_once()
else:
monitor.run()
if __name__ == '__main__':
main()