199 lines
6.4 KiB
Python
199 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Git 仓库监听器
|
||
监听多个 Git 仓库的指定分支,检测到新提交时触发部署
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import yaml
|
||
import logging
|
||
import subprocess
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
# 添加当前目录到 Python 路径
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from deployer import Deployer
|
||
|
||
|
||
class GitMonitor:
|
||
"""Git 仓库监听器"""
|
||
|
||
def __init__(self, config_path='.devops/config.yaml'):
|
||
"""初始化监听器"""
|
||
self.config_path = config_path
|
||
self.config = self._load_config()
|
||
self._setup_logging()
|
||
self.deployer = Deployer(self.config)
|
||
self.last_commits = {} # 存储每个仓库的最后一次提交 hash
|
||
|
||
# 读取全局分支配置
|
||
self.global_branch = self.config.get('global_branch', 'main')
|
||
|
||
self.logger.info("Git 监听器初始化完成")
|
||
self.logger.info(f"监听分支: {self.global_branch}")
|
||
|
||
def _load_config(self):
|
||
"""加载配置文件"""
|
||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||
return yaml.safe_load(f)
|
||
|
||
def _setup_logging(self):
|
||
"""设置日志"""
|
||
log_config = self.config.get('logging', {})
|
||
log_level = getattr(logging, log_config.get('level', 'INFO'))
|
||
log_file = log_config.get('file', '.devops/logs/devops.log')
|
||
|
||
# 确保日志目录存在
|
||
os.makedirs(os.path.dirname(log_file), exist_ok=True)
|
||
|
||
# 配置日志格式
|
||
formatter = logging.Formatter(
|
||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
|
||
# 文件处理器
|
||
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
||
file_handler.setFormatter(formatter)
|
||
file_handler.setLevel(log_level)
|
||
|
||
# 控制台处理器
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setFormatter(formatter)
|
||
console_handler.setLevel(log_level)
|
||
|
||
# 配置根 logger,让所有子 logger 都能输出
|
||
root_logger = logging.getLogger()
|
||
root_logger.setLevel(log_level)
|
||
root_logger.addHandler(file_handler)
|
||
root_logger.addHandler(console_handler)
|
||
|
||
# 配置当前 logger
|
||
self.logger = logging.getLogger('GitMonitor')
|
||
self.logger.setLevel(log_level)
|
||
|
||
def get_remote_commit(self, repo_url, branch):
|
||
"""获取远程仓库的最新提交 hash"""
|
||
try:
|
||
cmd = f"git ls-remote {repo_url} refs/heads/{branch}"
|
||
result = subprocess.run(
|
||
cmd, shell=True, capture_output=True, text=True, timeout=30
|
||
)
|
||
if result.returncode == 0 and result.stdout:
|
||
commit_hash = result.stdout.split()[0]
|
||
return commit_hash
|
||
return None
|
||
except Exception as e:
|
||
self.logger.error(f"获取远程提交失败 {repo_url}: {e}")
|
||
return None
|
||
|
||
def check_repository(self, repo_config):
|
||
"""检查单个仓库是否有新提交"""
|
||
repo_name = repo_config['name']
|
||
repo_url = repo_config['url']
|
||
|
||
self.logger.debug(f"检查仓库: {repo_name} (分支: {self.global_branch})")
|
||
|
||
# 获取最新提交(使用全局分支配置)
|
||
current_commit = self.get_remote_commit(repo_url, self.global_branch)
|
||
if not current_commit:
|
||
self.logger.warning(f"无法获取 {repo_name} 的最新提交")
|
||
return False
|
||
|
||
# 检查是否有新提交
|
||
last_commit = self.last_commits.get(repo_name)
|
||
if last_commit is None:
|
||
# 首次检查,记录当前提交
|
||
self.last_commits[repo_name] = current_commit
|
||
self.logger.info(f"初始化 {repo_name} 提交记录: {current_commit[:8]}")
|
||
return False
|
||
|
||
if current_commit != last_commit:
|
||
self.logger.info(
|
||
f"检测到 {repo_name} 有新提交: {last_commit[:8]} -> {current_commit[:8]}"
|
||
)
|
||
self.last_commits[repo_name] = current_commit
|
||
return True
|
||
|
||
return False
|
||
|
||
def get_enabled_repos(self):
|
||
"""获取需要监听的仓库列表"""
|
||
enabled = self.config['monitor'].get('enabled_repos', [])
|
||
all_repos = self.config['repositories']
|
||
|
||
if not enabled:
|
||
# 空列表表示监听所有仓库
|
||
return all_repos
|
||
|
||
# 只返回启用的仓库
|
||
return [repo for repo in all_repos if repo['name'] in enabled]
|
||
|
||
def run_once(self):
|
||
"""执行一次检查"""
|
||
repos = self.get_enabled_repos()
|
||
self.logger.info(f"开始检查 {len(repos)} 个仓库...")
|
||
|
||
for repo_config in repos:
|
||
try:
|
||
if self.check_repository(repo_config):
|
||
# 检测到新提交,触发部署
|
||
self.logger.info(f"触发部署: {repo_config['name']}")
|
||
success = self.deployer.deploy(repo_config)
|
||
|
||
if success:
|
||
self.logger.info(f"部署成功: {repo_config['name']}")
|
||
else:
|
||
self.logger.error(f"部署失败: {repo_config['name']}")
|
||
except Exception as e:
|
||
self.logger.error(f"处理仓库 {repo_config['name']} 时出错: {e}", exc_info=True)
|
||
|
||
def run(self):
|
||
"""持续监听运行"""
|
||
poll_interval = self.config['monitor']['poll_interval']
|
||
self.logger.info(f"开始监听 Git 仓库,轮询间隔: {poll_interval} 秒")
|
||
self.logger.info("按 Ctrl+C 停止监听")
|
||
|
||
try:
|
||
while True:
|
||
self.run_once()
|
||
time.sleep(poll_interval)
|
||
except KeyboardInterrupt:
|
||
self.logger.info("收到停止信号,退出监听")
|
||
except Exception as e:
|
||
self.logger.error(f"监听过程中发生错误: {e}", exc_info=True)
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description='Git 仓库监听器')
|
||
parser.add_argument(
|
||
'--config',
|
||
default='.devops/config.yaml',
|
||
help='配置文件路径'
|
||
)
|
||
parser.add_argument(
|
||
'--once',
|
||
action='store_true',
|
||
help='只执行一次检查,不持续监听'
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
monitor = GitMonitor(args.config)
|
||
|
||
if args.once:
|
||
monitor.run_once()
|
||
else:
|
||
monitor.run()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|