tuoheng_AIPlatform/yolov5-th/ai_auto_train_yolov5.py

1180 lines
50 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# train.py头部添加这些修改
import os
os.environ['GIT_PYTHON_REFRESH'] = 'quiet'
os.environ['WANDB_MODE'] = 'disabled'
import argparse
import logging
import math
import os
import random
import time
from copy import deepcopy
from pathlib import Path
from threading import Thread
import numpy as np
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import torch.utils.data
import datetime
import yaml
from torch.cuda import amp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from kafka import KafkaProducer
# jcq: 增加web端数据传输 - 0508
import requests
from flask import Flask, request, jsonify
from threading import Thread
import uuid
import json
from flask_restful import Api
app = Flask(__name__)
api = Api(app)
import test # import test.py to get mAP after each epoch
from models.experimental import attempt_load
from models.yolo import Model
from utils.autoanchor import check_anchors
from utils.datasets import create_dataloader
from utils.general import labels_to_class_weights, increment_path, labels_to_image_weights, init_seeds, \
fitness, strip_optimizer, get_latest_run, check_dataset, check_file, check_git_status, check_img_size, \
check_requirements, print_mutation, set_logging, one_cycle, colorstr
from utils.google_utils import attempt_download
from utils.loss import ComputeLoss
from utils.plots import plot_images, plot_labels, plot_results, plot_evolution
from utils.torch_utils import ModelEMA, select_device, intersect_dicts, torch_distributed_zero_first, is_parallel
from utils.wandb_logging.wandb_utils import WandbLogger, check_wandb_resume
# Kafka imports
from kafka import KafkaConsumer
import json
import threading
from ast import literal_eval
import io
import json
import cv2
import os
import urllib3
from minio import Minio
# train.py头部添加这些修改
import os
os.environ['GIT_PYTHON_REFRESH'] = 'quiet'
os.environ['WANDB_MODE'] = 'disabled'
import numpy as np
np.int = np.int64 # 全局替换已弃用的np.int
# MinIO
ENDPOINT = "minio-jndsj.t-aaron.com:2443" # MinIO服务器地址
ACCESS_KEY = "PJM0c2qlauoXv5TMEHm2" # 访问密钥
SECRET_KEY = "Wr69Dm3ZH39M3GCSeyB3eFLynLPuGCKYfphixZuI" # 密钥
# 创建MinIO客户端
minioClient = Minio(
ENDPOINT,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY
)
logger = logging.getLogger(__name__)
topic_train ='training-tasks'
# jcq: 增加web端数据传输 - 0508
# 替换为实际的目标 URL
url = "http://172.18.0.129:8084/api/admin/trainTask/updateTrainTask"
# 自定义请求头,根据实际接口需求调整
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
class TrainingServer:
def __init__(self):
self.shutdown_flag = False
self.current_training_thread = None
self.running_tasks = {} # Dictionary to track running tasks {request_id: (thread, stop_event)}
self.kafka_producer = None
self.setup_logging()
self.setup_producer()
def setup_producer(self):
"""Initialize and maintain Kafka producer connection"""
producer_config = {
'bootstrap_servers': '172.18.0.126:9094',
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
'retries': 5,
'retry_backoff_ms': 1000,
'request_timeout_ms': 30000,
'max_block_ms': 60000,
'acks': 'all' # Ensure message durability
}
try:
self.kafka_producer = KafkaProducer(**producer_config)
logger.info("Kafka producer initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Kafka producer: {e}")
self.kafka_producer = None
def get_producer(self):
"""获取可用的生产者实例"""
if self.kafka_producer is None:
self.setup_producer()
return self.kafka_producer
def safe_send_metrics(self, topic, data):
"""Thread-safe metric sending with error handling"""
producer = self.get_producer()
if producer:
try:
future = producer.send(
topic,
value=data,
timestamp_ms=int(time.time()*1000))
# 修正后的回调函数
def delivery_callback(err, msg=None):
if err:
logger.error(f"Message delivery failed: {err}")
self.kafka_producer = None
else:
logger.debug(f"Message delivered to {msg.topic}[{msg.partition}]")
future.add_callback(delivery_callback)
return True
except Exception as e:
logger.error(f"Error sending metrics: {e}")
self.kafka_producer = None
return False
def start_training(self, opt, request_id,version,dspModelCode):
# 确定保存文件夹路径
"""Start training with given options"""
try:
# Set save_dir if not already set
if not hasattr(opt, 'save_dir') or not opt.save_dir:
opt.save_dir = str(Path(opt.project) / opt.name)
opt.save_dir = increment_path(Path(opt.save_dir), exist_ok=opt.exist_ok)
# Load hyperparameters
with open(opt.hyp) as f:
hyp = yaml.load(f, Loader=yaml.SafeLoader)
# # Set device
device = select_device(opt.device, batch_size=opt.batch_size)
self.current_training_thread = threading.Thread(
target=self.train_wrapper,
args=(hyp, opt, device, request_id,version,dspModelCode),
daemon=True
)
self.current_training_thread.start()
except Exception as e:
logger.error(f"Error starting training: {e}")
def stop_training(self, request_id):
"""Stop a specific training task by request_id"""
if request_id in self.running_tasks:
thread, stop_event = self.running_tasks[request_id]
stop_event.set()
thread.join(timeout=5)
if thread.is_alive():
logger.warning(f"Training thread for {request_id} did not stop gracefully")
del self.running_tasks[request_id]
logger.info(f"Stopped training task {request_id}")
# Prepare and send the API request
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
payload = {
"id": request_id, # Using the actual request_id instead of hardcoded "JCQ"
"status": 5
}
try:
response = requests.post(
"http://172.18.0.129:8084/api/admin/trainTask/updateTrainTask",
json=payload
)
response.raise_for_status() # Raise exception for bad status codes
logger.info(f"Successfully updated task {request_id} via API")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update task {request_id} via API: {str(e)}")
return True
else:
logger.warning(f"No running training task found with ID {request_id}")
return False
def train_wrapper(self, hyp, opt, device, request_id, version, dspModelCode):
"""Wrapper for the train function to handle exceptions and stop events"""
stop_event = threading.Event()
self.running_tasks[request_id] = (threading.current_thread(), stop_event)
try:
logger.info(f"Starting training for request {request_id}")
# Send training start notification
self.safe_send_metrics('train_events', {
'event_type': 'training_started',
'request_id': request_id,
'timestamp': time.time(),
'params': vars(opt)
})
# 调用训练函数
device = torch.device('cpu')
train(hyp, opt, device, request_id, version, dspModelCode, None, stop_event)
logger.info(f"Training completed successfully for request {request_id}")
self.safe_send_metrics('train_events', {
'event_type': 'training_completed',
'request_id': request_id,
'timestamp': time.time()
})
except Exception as e:
logger.error(f"Training failed for request {request_id}: {str(e)}", exc_info=True)
# 构建错误响应数据
error_data = {
"id": request_id,
"status": 5, # 错误状态码
"error": str(e)
}
# 发送错误通知到API
try:
response = requests.post(
"http://172.18.0.129:8084/api/admin/trainTask/updateTrainTask",
headers=headers,
json=error_data
)
response.raise_for_status()
logger.info(f"Successfully sent error notification for task {request_id}")
except requests.exceptions.RequestException as api_err:
logger.error(f"Failed to send error notification: {api_err}")
# 发送错误事件到Kafka
self.safe_send_metrics('train_events', {
'event_type': 'training_failed',
'request_id': request_id,
'timestamp': time.time(),
'error': str(e)
})
finally:
if request_id in self.running_tasks:
del self.running_tasks[request_id]
def run_server(self):
"""Main server loop to listen for training requests"""
logger.info("Starting YOLOv5 Training Server")
while not self.shutdown_flag:
try:
consumer = self.setup_kafka_consumer()
logger.info("Kafka consumer initialized, waiting for training requests...")
for message in consumer:
if self.shutdown_flag:
logger.info("Shutdown flag detected, stopping consumer...")
break
try:
request_data = message.value
logger.info(f"Received training request: {request_data}")
command, data= self.process_training_request(request_data)
request_id ,version,dspModelCode = request_data.get("Request_ID"),request_data.get("Version"),request_data.get("Model")
print("###line318",request_id,version,dspModelCode)
if command == 'start':
opt, request_id = data
if request_id in self.running_tasks:
logger.warning(f"Training already in progress for ID {request_id}")
continue
logger.info(f"Starting training with params: {vars(opt)}")
self.start_training(opt, request_id,version,dspModelCode)
elif command == 'stop':
request_id = data
self.stop_training(request_id)
except Exception as e:
logger.error(f"Error processing Kafka message: {e}", exc_info=True)
continue
except Exception as e:
logger.error(f"Error processing Kafka message: {e}", exc_info=True)
continue
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def setup_kafka_consumer(self):
"""Setup Kafka consumer with proper configuration"""
kafka_config = {
'bootstrap_servers': '172.18.0.126:9094', # Use your actual broker IP
'group_id': 'yolov5_training_server',
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'max_poll_interval_ms': 300000,
'value_deserializer': lambda x: json.loads(x.decode('utf-8'))
}
return KafkaConsumer(topic_train, **kafka_config)
def parse_training_parameters(self, train_params_str):
"""Parse training parameters string"""
try:
params = literal_eval(train_params_str)
if not isinstance(params, list):
return None
return {
'batch_size': params[0] if len(params) > 0 else 8,
'imgsz': params[1] if len(params) > 1 else 640,
'epochs': params[2] if len(params) > 2 else 300,
'data': params[3] if len(params) > 3 else 'data/ship2.yaml',
'weights': params[4] if len(params) > 4 else 'yolov5s.pt'
}
except Exception as e:
logger.error(f"Error parsing training parameters: {e}")
return None
def process_training_request(self, request_data):
"""Process incoming training request with all required attributes"""
if request_data.get('Scene') != 'Train':
return None, None
if request_data.get('Command') == 'stop':
return 'stop', request_data.get('Request_ID')
# Parse training parameters
params = self.parse_training_parameters(request_data.get('TrainParameter', '[]'))
if not params:
params = {
'batch_size': 8,
'imgsz': 640,
'epochs': 300,
'data': 'data/ship2.yaml',
'weights': 'yolov5s.pt'
}
# Create training arguments with all required attributes
train_args = {
'weights': params['weights'],
'data': params['data'],
'epochs': params['epochs'],
'batch_size': params['batch_size'],
'img_size': [params['imgsz'], params['imgsz']],
'project': f"runs/train/{request_data.get('Request_ID', 'default')}",
# 'name': request_data.get('Version', 'v1.0'),
'name': request_data.get('Version'),
'exist_ok': True,
'from_kafka': True,
# Add all required default attributes
'save_dir': '', # This will be set in train() function
'global_rank': -1,
'local_rank': -1,
'world_size': 1,
'total_batch_size': params['batch_size'],
'hyp': 'data/hyp.scratch.yaml',
'cfg': '',
'rect': False,
'resume': False,
'nosave': False,
'notest': False,
'noautoanchor': False,
'cache_images': False,
'image_weights': False,
'device': '',
'multi_scale': False,
'single_cls': False,
'adam': False,
'sync_bn': False,
'workers': 8,
'entity': None,
'upload_dataset': False,
'bbox_interval': -1,
'quad': False,
'linear_lr': False,
'label_smoothing': 0.0,
'save_period': -1,
'artifact_alias': "latest",
'bucket': '',
'evolve': False
}
opt = argparse.Namespace(**train_args)
# 增加返回的参数
return 'start', (opt, request_data.get('Request_ID'))
def train(hyp, opt, device,task_id ,version,dspModelCode,tb_writer=None, stop_event=None):
try:
print("###line473",task_id,version,dspModelCode)
# 移除局部生产者初始化使用server实例的生产者
producer = server.get_producer() # 通过全局server实例获取
# 初始化 Kafka 生产者
kafka_producer = KafkaProducer(
bootstrap_servers='172.18.0.126:9094',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
"""主函数修改支持从Kafka启动"""
# 如果是通过Kafka启动的训练不检查git状态
if not getattr(opt, 'from_kafka', False):
if opt.global_rank in [-1, 0]:
print("##line513")
# check_git_status()
# check_requirements()
logger.info(colorstr('hyperparameters: ') + ', '.join(f'{k}={v}' for k, v in hyp.items()))
save_dir, epochs, batch_size, total_batch_size, weights, rank = \
Path(opt.save_dir), opt.epochs, opt.batch_size, opt.total_batch_size, opt.weights, opt.global_rank
# Directories
wdir = save_dir / 'weights'
wdir.mkdir(parents=True, exist_ok=True) # make dir
last = wdir / 'last.pt'
best = wdir / 'best.pt'
#jcq:
yolov5 = wdir / 'yolov5.pt'
results_file = save_dir / 'results.txt'
# Save run settings
with open(save_dir / 'hyp.yaml', 'w') as f:
yaml.dump(hyp, f, sort_keys=False)
with open(save_dir / 'opt.yaml', 'w') as f:
yaml.dump(vars(opt), f, sort_keys=False)
# Configure
plots = not opt.evolve # create plots
cuda = device.type != 'cpu'
init_seeds(2 + rank)
with open(opt.data) as f:
data_dict = yaml.load(f, Loader=yaml.SafeLoader) # data dict
is_coco = opt.data.endswith('coco.yaml')
# Logging- Doing this before checking the dataset. Might update data_dict
loggers = {'wandb': None} # loggers dict
if rank in [-1, 0]:
opt.hyp = hyp # add hyperparameters
run_id = torch.load(weights).get('wandb_id') if weights.endswith('.pt') and os.path.isfile(weights) else None
# run_id = torch.load(weights, weights_only=False).get('wandb_id') if weights.endswith('.pt') and os.path.isfile(weights) else None
wandb_logger = WandbLogger(opt, Path(opt.save_dir).stem, run_id, data_dict)
loggers['wandb'] = wandb_logger.wandb
data_dict = wandb_logger.data_dict
if wandb_logger.wandb:
weights, epochs, hyp = opt.weights, opt.epochs, opt.hyp # WandbLogger might update weights, epochs if resuming
nc = 1 if opt.single_cls else int(data_dict['nc']) # number of classes
names = ['item'] if opt.single_cls and len(data_dict['names']) != 1 else data_dict['names'] # class names
assert len(names) == nc, '%g names found for nc=%g dataset in %s' % (len(names), nc, opt.data) # check
# Model
pretrained = weights.endswith('.pt')
if pretrained:
with torch_distributed_zero_first(rank):
print("###567")
# attempt_download(weights) # download if not found locally
ckpt = torch.load(weights, map_location=device) # load checkpoint
model = Model(opt.cfg or ckpt['model'].yaml, ch=3, nc=nc, anchors=hyp.get('anchors')).to(device) # create
exclude = ['anchor'] if (opt.cfg or hyp.get('anchors')) and not opt.resume else [] # exclude keys
state_dict = ckpt['model'].float().state_dict() # to FP32
state_dict = intersect_dicts(state_dict, model.state_dict(), exclude=exclude) # intersect
model.load_state_dict(state_dict, strict=False) # load
logger.info('Transferred %g/%g items from %s' % (len(state_dict), len(model.state_dict()), weights)) # report
else:
model = Model(opt.cfg, ch=3, nc=nc, anchors=hyp.get('anchors')).to(device) # create
with torch_distributed_zero_first(rank):
check_dataset(data_dict) # check
train_path = data_dict['train']
test_path = data_dict['val']
# Freeze
freeze = [] # parameter names to freeze (full or partial)
for k, v in model.named_parameters():
v.requires_grad = True # train all layers
if any(x in k for x in freeze):
print('freezing %s' % k)
v.requires_grad = False
# Optimizer
nbs = 64 # nominal batch size
accumulate = max(round(nbs / total_batch_size), 1) # accumulate loss before optimizing
hyp['weight_decay'] *= total_batch_size * accumulate / nbs # scale weight_decay
logger.info(f"Scaled weight_decay = {hyp['weight_decay']}")
pg0, pg1, pg2 = [], [], [] # optimizer parameter groups
for k, v in model.named_modules():
if hasattr(v, 'bias') and isinstance(v.bias, nn.Parameter):
pg2.append(v.bias) # biases
if isinstance(v, nn.BatchNorm2d):
pg0.append(v.weight) # no decay
elif hasattr(v, 'weight') and isinstance(v.weight, nn.Parameter):
pg1.append(v.weight) # apply decay
if opt.adam:
optimizer = optim.Adam(pg0, lr=hyp['lr0'], betas=(hyp['momentum'], 0.999)) # adjust beta1 to momentum
else:
optimizer = optim.SGD(pg0, lr=hyp['lr0'], momentum=hyp['momentum'], nesterov=True)
optimizer.add_param_group({'params': pg1, 'weight_decay': hyp['weight_decay']}) # add pg1 with weight_decay
optimizer.add_param_group({'params': pg2}) # add pg2 (biases)
logger.info('Optimizer groups: %g .bias, %g conv.weight, %g other' % (len(pg2), len(pg1), len(pg0)))
del pg0, pg1, pg2
# Scheduler https://arxiv.org/pdf/1812.01187.pdf
# https://pytorch.org/docs/stable/_modules/torch/optim/lr_scheduler.html#OneCycleLR
if opt.linear_lr:
lf = lambda x: (1 - x / (epochs - 1)) * (1.0 - hyp['lrf']) + hyp['lrf'] # linear
else:
lf = one_cycle(1, hyp['lrf'], epochs) # cosine 1->hyp['lrf']
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
# plot_lr_scheduler(optimizer, scheduler, epochs)
# EMA
ema = ModelEMA(model) if rank in [-1, 0] else None
# Resume
start_epoch, best_fitness = 0, 0.0
if pretrained:
# Optimizer
if ckpt['optimizer'] is not None:
optimizer.load_state_dict(ckpt['optimizer'])
best_fitness = ckpt['best_fitness']
# EMA
if ema and ckpt.get('ema'):
ema.ema.load_state_dict(ckpt['ema'].float().state_dict())
ema.updates = ckpt['updates']
# Results
if ckpt.get('training_results') is not None:
results_file.write_text(ckpt['training_results']) # write results.txt
# Epochs
start_epoch = ckpt['epoch'] + 1
if opt.resume:
assert start_epoch > 0, '%s training to %g epochs is finished, nothing to resume.' % (weights, epochs)
if epochs < start_epoch:
logger.info('%s has been trained for %g epochs. Fine-tuning for %g additional epochs.' %
(weights, ckpt['epoch'], epochs))
epochs += ckpt['epoch'] # finetune additional epochs
del ckpt, state_dict
# Image sizes
gs = max(int(model.stride.max()), 32) # grid size (max stride)
nl = model.model[-1].nl # number of detection layers (used for scaling hyp['obj'])
imgsz, imgsz_test = [check_img_size(x, gs) for x in opt.img_size] # verify imgsz are gs-multiples
# DP mode
if cuda and rank == -1 and torch.cuda.device_count() > 1:
model = torch.nn.DataParallel(model)
# SyncBatchNorm
if opt.sync_bn and cuda and rank != -1:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
logger.info('Using SyncBatchNorm()')
# Trainloader
dataloader, dataset = create_dataloader(train_path, imgsz, batch_size, gs, opt,
hyp=hyp, augment=True, cache=opt.cache_images, rect=opt.rect, rank=rank,
world_size=opt.world_size, workers=opt.workers,
image_weights=opt.image_weights, quad=opt.quad, prefix=colorstr('train: '))
mlc = np.concatenate(dataset.labels, 0)[:, 0].max() # max label class
nb = len(dataloader) # number of batches
logger.info('#####bs:%d steps:%d ' % (batch_size, nb))
assert mlc < nc, 'Label class %g exceeds nc=%g in %s. Possible class labels are 0-%g' % (mlc, nc, opt.data, nc - 1)
# Process 0
if rank in [-1, 0]:
testloader = create_dataloader(test_path, imgsz_test, batch_size * 2, gs, opt, # testloader
hyp=hyp, cache=opt.cache_images and not opt.notest, rect=True, rank=-1,
world_size=opt.world_size, workers=opt.workers,
pad=0.5, prefix=colorstr('val: '))[0]
if not opt.resume:
labels = np.concatenate(dataset.labels, 0)
c = torch.tensor(labels[:, 0]) # classes
# cf = torch.bincount(c.long(), minlength=nc) + 1. # frequency
# model._initialize_biases(cf.to(device))
if plots:
plot_labels(labels, names, save_dir, loggers)
# if tb_writer:
# tb_writer.add_histogram('classes', c, 0)
# Anchors
if not opt.noautoanchor:
check_anchors(dataset, model=model, thr=hyp['anchor_t'], imgsz=imgsz)
model.half().float() # pre-reduce anchor precision
# DDP mode
if cuda and rank != -1:
model = DDP(model, device_ids=[opt.local_rank], output_device=opt.local_rank,
# nn.MultiheadAttention incompatibility with DDP https://github.com/pytorch/pytorch/issues/26698
find_unused_parameters=any(isinstance(layer, nn.MultiheadAttention) for layer in model.modules()))
# Model parameters
hyp['box'] *= 3. / nl # scale to layers
hyp['cls'] *= nc / 80. * 3. / nl # scale to classes and layers
hyp['obj'] *= (imgsz / 640) ** 2 * 3. / nl # scale to image size and layers
hyp['label_smoothing'] = opt.label_smoothing
model.nc = nc # attach number of classes to model
model.hyp = hyp # attach hyperparameters to model
model.gr = 1.0 # iou loss ratio (obj_loss = 1.0 or iou)
model.class_weights = labels_to_class_weights(dataset.labels, nc).to(device) * nc # attach class weights
model.names = names
# Start training
t0 = time.time()
nw = max(round(hyp['warmup_epochs'] * nb), 1000) # number of warmup iterations, max(3 epochs, 1k iterations)
# nw = min(nw, (epochs - start_epoch) / 2 * nb) # limit warmup to < 1/2 of training
maps = np.zeros(nc) # mAP per class
results = (0, 0, 0, 0, 0, 0, 0) # P, R, mAP@.5, mAP@.5-.95, val_loss(box, obj, cls)
scheduler.last_epoch = start_epoch - 1 # do not move
scaler = amp.GradScaler(enabled=cuda)
compute_loss = ComputeLoss(model) # init loss class
logger.info(f'Image sizes {imgsz} train, {imgsz_test} test\n'
f'Using {dataloader.num_workers} dataloader workers\n'
f'Logging results to {save_dir}\n'
f'Starting training for {epochs} epochs...')
# 在循环外部记录整个训练开始的时间
total_train_start_time = time.time()
print("line691",total_train_start_time)
# 转换时间戳为可读时间格式
start_time_str = datetime.datetime.fromtimestamp(total_train_start_time).strftime("%Y-%m-%d %H:%M:%S.%f")
print("line691 训练开始时间(正常格式):", start_time_str)
for epoch in range(start_epoch, epochs): # epoch ------------------------------------------------------------------
trainStartTime = time.time()
# Check if we should stop
if stop_event and stop_event.is_set():
logger.info(f"Training stopped by request at epoch {epoch}")
break
model.train()
# Update image weights (optional)
if opt.image_weights:
# Generate indices
if rank in [-1, 0]:
cw = model.class_weights.cpu().numpy() * (1 - maps) ** 2 / nc # class weights
iw = labels_to_image_weights(dataset.labels, nc=nc, class_weights=cw) # image weights
dataset.indices = random.choices(range(dataset.n), weights=iw, k=dataset.n) # rand weighted idx
# Broadcast if DDP
if rank != -1:
indices = (torch.tensor(dataset.indices) if rank == 0 else torch.zeros(dataset.n)).int()
dist.broadcast(indices, 0)
if rank != 0:
dataset.indices = indices.cpu().numpy()
# Update mosaic border
# b = int(random.uniform(0.25 * imgsz, 0.75 * imgsz + gs) // gs * gs)
# dataset.mosaic_border = [b - imgsz, -b] # height, width borders
mloss = torch.zeros(4, device=device) # mean losses
if rank != -1:
dataloader.sampler.set_epoch(epoch)
pbar = enumerate(dataloader)
logger.info(('\n' + '%10s' * 8) % ('Epoch', 'gpu_mem', 'box', 'obj', 'cls', 'total', 'labels', 'img_size'))
if rank in [-1, 0]:
pbar = tqdm(pbar, total=nb) # progress bar
optimizer.zero_grad()
# print('+++++++++++++++++++++++++++++%s'% paths)
for i, (imgs, targets, paths, _) in pbar: # batch -------------------------------------------------------------
ni = i + nb * epoch # number integrated batches (since train start)
imgs = imgs.to(device, non_blocking=True).float() / 255.0 # uint8 to float32, 0-255 to 0.0-1.0
# Warmup
if ni <= nw:
xi = [0, nw] # x interp
# model.gr = np.interp(ni, xi, [0.0, 1.0]) # iou loss ratio (obj_loss = 1.0 or iou)
accumulate = max(1, np.interp(ni, xi, [1, nbs / total_batch_size]).round())
for j, x in enumerate(optimizer.param_groups):
# bias lr falls from 0.1 to lr0, all other lrs rise from 0.0 to lr0
x['lr'] = np.interp(ni, xi, [hyp['warmup_bias_lr'] if j == 2 else 0.0, x['initial_lr'] * lf(epoch)])
if 'momentum' in x:
x['momentum'] = np.interp(ni, xi, [hyp['warmup_momentum'], hyp['momentum']])
# Multi-scale
if opt.multi_scale:
sz = random.randrange(imgsz * 0.5, imgsz * 1.5 + gs) // gs * gs # size
sf = sz / max(imgs.shape[2:]) # scale factor
if sf != 1:
ns = [math.ceil(x * sf / gs) * gs for x in imgs.shape[2:]] # new shape (stretched to gs-multiple)
imgs = F.interpolate(imgs, size=ns, mode='bilinear', align_corners=False)
# Forward
with amp.autocast(enabled=cuda):
pred = model(imgs) # forward
loss, loss_items = compute_loss(pred, targets.to(device)) # loss scaled by batch_size
if rank != -1:
loss *= opt.world_size # gradient averaged between devices in DDP mode
if opt.quad:
loss *= 4.
# Backward
scaler.scale(loss).backward()
# Optimize
if ni % accumulate == 0:
scaler.step(optimizer) # optimizer.step
scaler.update()
optimizer.zero_grad()
if ema:
ema.update(model)
# Print
if rank in [-1, 0]:
mloss = (mloss * i + loss_items) / (i + 1) # update mean losses
mem = '%.3gG' % (torch.cuda.memory_reserved() / 1E9 if torch.cuda.is_available() else 0) # (GB)
s = ('%10s' * 2 + '%10.4g' * 6) % (
'%g/%g' % (epoch, epochs - 1), mem, *mloss, targets.shape[0], imgs.shape[-1])
pbar.set_description(s)
# Plot
if plots and ni < 3:
f = save_dir / f'train_batch{ni}.jpg' # filename
Thread(target=plot_images, args=(imgs, targets, paths, f), daemon=True).start()
# if tb_writer:
# tb_writer.add_image(f, result, dataformats='HWC', global_step=epoch)
# tb_writer.add_graph(torch.jit.trace(model, imgs, strict=False), []) # add model graph
elif plots and ni == 10 and wandb_logger.wandb:
wandb_logger.log({"Mosaics": [wandb_logger.wandb.Image(str(x), caption=x.name) for x in
save_dir.glob('train*.jpg') if x.exists()]})
# end batch ------------------------------------------------------------------------------------------------
# end epoch ----------------------------------------------------------------------------------------------------
# Scheduler
lr = [x['lr'] for x in optimizer.param_groups] # for tensorboard
scheduler.step()
# DDP process 0 or single-GPU
if rank in [-1, 0]:
# mAP
ema.update_attr(model, include=['yaml', 'nc', 'hyp', 'gr', 'names', 'stride', 'class_weights'])
final_epoch = epoch + 1 == epochs
if not opt.notest or final_epoch: # Calculate mAP
wandb_logger.current_epoch = epoch + 1
results, maps, times = test.test(data_dict,
batch_size=batch_size * 2,
imgsz=imgsz_test,
model=ema.ema,
single_cls=opt.single_cls,
dataloader=testloader,
save_dir=save_dir,
verbose=nc < 50 and final_epoch,
plots=plots and final_epoch,
wandb_logger=wandb_logger,
compute_loss=compute_loss,
is_coco=is_coco)
# Write
with open(results_file, 'a') as f:
f.write(s + '%10.4g' * 7 % results + '\n') # append metrics, val_loss
if len(opt.name) and opt.bucket:
os.system('gsutil cp %s gs://%s/results/results%s.txt' % (results_file, opt.bucket, opt.name))
# 计算进度百分比(保留两位小数)
progress_percentage = f"{(epoch + 1) / epochs * 100:.2f}%"
# 修改消息发送部分
if producer:
try:
# 示例:在训练中显示总耗时(带正常时间格式)
current_time = time.time()
elapsed_seconds = current_time - total_train_start_time
elapsed_time = datetime.timedelta(seconds=elapsed_seconds)
elapsed_minutes = int(1 + elapsed_seconds // 60) # 转换为分钟(取整)
print(f"###########line 876 Epoch {epoch+1}/{epochs} - 已耗时: {elapsed_minutes} 分钟")
# 转换时间戳为可读时间格式
current_time_str = datetime.datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S.%f")
print("line879 训练开始时间(正常格式):", start_time_str)
value = 0.6789
Accuracy_percent = f"{results[0]:.2%}"
Recall_percent = f"{results[1]:.2%}"
map_percent = f"{results[2]:.2%}"
percentage = f"{value:.2%}" # 自动乘以100并保留两位小数添加百分号
print(percentage) # 输出: 67.89%
monitor_data = {
"id": task_id,
"dspModelCode": dspModelCode,
"algorithmMap": map_percent,
"algorithmAccuracy":Accuracy_percent,
"algorithmRecall": Recall_percent,
"aiVersion": version,
"progress": progress_percentage,
"trainStartTime": start_time_str,
"trainEndTime": current_time_str,
"trainDuration": str(elapsed_minutes), #持续时间
"status": 4 if progress_percentage == "100.00%" else 2
}
print("#####864",monitor_data)
#最终上传路径:
# http://172.18.0.129:8084/algorithm/027/V1.1/best.pt
# jcq: 将数据发送到web 端 - 0508
try:
# 发送包含自定义请求头和 JSON 数据的 POST 请求
response = requests.post(url, headers=headers, json=monitor_data)
# 检查响应状态码
response.raise_for_status()
print("状态码:", response.status_code)
print("响应内容:", response.text)
except requests.exceptions.HTTPError as http_err:
print(f"HTTP 错误发生: {http_err}")
except requests.exceptions.RequestException as req_err:
print(f"请求错误发生: {req_err}")
future = producer.send(
'train_monitor',
value=monitor_data,
timestamp_ms=int(time.time()*1000))
def callback(err, msg=None):
if err:
logger.error(f"Delivery failed: {err}")
else:
logger.debug(f"Message delivered to {msg.topic}[{msg.partition}]")
future.add_callback(callback)
except Exception as e:
logger.error(f"Error sending metrics: {e}")
# 标记生产者不可用,下次会自动重建
server.kafka_producer = None
# 原有日志逻辑保持不变
if len(opt.name) and opt.bucket:
os.system('gsutil cp %s gs://%s/results/results%s.txt' % (results_file, opt.bucket, opt.name))
tags = ['train/box_loss', 'train/obj_loss', 'train/cls_loss', ...] # 保持原有
for x, tag in zip(list(mloss[:-1]) + list(results) + lr, tags):
if tb_writer:
tb_writer.add_scalar(tag, x, epoch)
if wandb_logger.wandb:
wandb_logger.log({tag: x})
# 在函数结束时关闭生产者
if 'kafka_producer' in locals():
kafka_producer.close()
# Log
tags = ['train/box_loss', 'train/obj_loss', 'train/cls_loss', # train loss
'metrics/precision', 'metrics/recall', 'metrics/mAP_0.5', 'metrics/mAP_0.5:0.95',
'val/box_loss', 'val/obj_loss', 'val/cls_loss', # val loss
'x/lr0', 'x/lr1', 'x/lr2'] # params
for x, tag in zip(list(mloss[:-1]) + list(results) + lr, tags):
if tb_writer:
tb_writer.add_scalar(tag, x, epoch) # tensorboard
if wandb_logger.wandb:
wandb_logger.log({tag: x}) # W&B
# Update best mAP
fi = fitness(np.array(results).reshape(1, -1)) # weighted combination of [P, R, mAP@.5, mAP@.5-.95]
if fi > best_fitness:
best_fitness = fi
wandb_logger.end_epoch(best_result=best_fitness == fi)
# Save model
if (not opt.nosave) or (final_epoch and not opt.evolve): # if save
ckpt = {'epoch': epoch,
'best_fitness': best_fitness,
'training_results': results_file.read_text(),
'model': deepcopy(model.module if is_parallel(model) else model).half(),
'ema': deepcopy(ema.ema).half(),
'updates': ema.updates,
'optimizer': optimizer.state_dict(),
'wandb_id': wandb_logger.wandb_run.id if wandb_logger.wandb else None}
# Save last, best and delete
torch.save(ckpt, last)
if best_fitness == fi:
torch.save(ckpt, best)
torch.save(ckpt,yolov5)
monitor_data = {
"id": task_id,
"dspModelCode": dspModelCode,
"algorithmMap": map_percent,
"algorithmAccuracy":Accuracy_percent,
"algorithmRecall": Recall_percent,
"aiVersion": version,
"progress": progress_percentage,
"trainStartTime": start_time_str,
"trainEndTime": current_time_str,
"trainDuration": str(elapsed_minutes), #持续时间
"status": 2 if progress_percentage == "100.00%" else 4
}
aa = best # 本地文件名
# 构建 bucket 名称和对象路径
bucket_name = "algorithm" # 根据实际情况可能需要调整
weights = "weights"
object_path = f"{weights}/{dspModelCode}/{version}/yolov5.pt" # 在 MinIO 中的存储路径
try:
# 上传文件到 MinIO
result = minioClient.fput_object(
bucket_name=bucket_name,
object_name=object_path, # 目标路径
file_path=aa, # 本地文件路径
)
print("文件上传成功:")
print(f"Bucket: {result.bucket_name}")
print(f"Object: {result.object_name}")
print(f"ETag: {result.etag}")
# 检查 bucket 是否存在
if minioClient.bucket_exists("th-dsp"):
print("th-dsp exists")
else:
print("th-dsp does not exist")
except Exception as e:
print(f"上传失败: {e}")
if wandb_logger.wandb:
if ((epoch + 1) % opt.save_period == 0 and not final_epoch) and opt.save_period != -1:
wandb_logger.log_model(
last.parent, opt, epoch, fi, best_model=best_fitness == fi)
del ckpt
# end epoch ----------------------------------------------------------------------------------------------------
# end training
if rank in [-1, 0]:
# Plots
if plots:
plot_results(save_dir=save_dir) # save as results.png
if wandb_logger.wandb:
files = ['results.png', 'confusion_matrix.png', *[f'{x}_curve.png' for x in ('F1', 'PR', 'P', 'R')]]
wandb_logger.log({"Results": [wandb_logger.wandb.Image(str(save_dir / f), caption=f) for f in files
if (save_dir / f).exists()]})
# Test best.pt
logger.info('%g epochs completed in %.3f hours.\n' % (epoch - start_epoch + 1, (time.time() - t0) / 3600))
if opt.data.endswith('coco.yaml') and nc == 80: # if COCO
for m in (last, best) if best.exists() else (last): # speed, mAP tests
results, _, _ = test.test(opt.data,
batch_size=batch_size * 2,
imgsz=imgsz_test,
conf_thres=0.001,
iou_thres=0.7,
model=attempt_load(m, device).half(),
single_cls=opt.single_cls,
dataloader=testloader,
save_dir=save_dir,
save_json=True,
plots=False,
is_coco=is_coco)
# Strip optimizers
final = best if best.exists() else last # final model
for f in last, best:
if f.exists():
strip_optimizer(f) # strip optimizers
if opt.bucket:
os.system(f'gsutil cp {final} gs://{opt.bucket}/weights') # upload
if wandb_logger.wandb and not opt.evolve: # Log the stripped model
wandb_logger.wandb.log_artifact(str(final), type='model',
name='run_' + wandb_logger.wandb_run.id + '_model',
aliases=['last', 'best', 'stripped'])
wandb_logger.finish_run()
else:
dist.destroy_process_group()
torch.cuda.empty_cache()
return results
except Exception as e:
logger.error(f"Training error occurred: {str(e)}", exc_info=True)
# 构建错误响应数据
error_data = {
"id": task_id,
"status": 3, # 错误状态码
"error": str(e)
}
# 发送错误通知到API
try:
response = requests.post(
"http://172.18.0.129:8084/api/admin/trainTask/updateTrainTask",
headers=headers,
json=error_data
)
response.raise_for_status()
logger.info(f"Successfully sent error notification for task {task_id}")
except requests.exceptions.RequestException as api_err:
logger.error(f"Failed to send error notification: {api_err}")
# 重新抛出异常以便外层捕获
raise
if __name__ == '__main__':
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('yolov5_training_server.log')
]
)
# Initialize server
global server # Make server instance available to train()
server = TrainingServer()
try:
server.run_server()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down...")
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
finally:
server.shutdown_flag = True