用kafka接收消息
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

před 2 roky
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import json
  2. import sys
  3. from pathlib import Path
  4. import torch
  5. import yaml
  6. from tqdm import tqdm
  7. sys.path.append(str(Path(__file__).parent.parent.parent)) # add utils/ to path
  8. from utils.datasets import LoadImagesAndLabels
  9. from utils.datasets import img2label_paths
  10. from utils.general import colorstr, xywh2xyxy, check_dataset
  11. try:
  12. import wandb
  13. from wandb import init, finish
  14. except ImportError:
  15. wandb = None
  16. WANDB_ARTIFACT_PREFIX = 'wandb-artifact://'
  17. def remove_prefix(from_string, prefix=WANDB_ARTIFACT_PREFIX):
  18. return from_string[len(prefix):]
  19. def check_wandb_config_file(data_config_file):
  20. wandb_config = '_wandb.'.join(data_config_file.rsplit('.', 1)) # updated data.yaml path
  21. if Path(wandb_config).is_file():
  22. return wandb_config
  23. return data_config_file
  24. def get_run_info(run_path):
  25. run_path = Path(remove_prefix(run_path, WANDB_ARTIFACT_PREFIX))
  26. run_id = run_path.stem
  27. project = run_path.parent.stem
  28. model_artifact_name = 'run_' + run_id + '_model'
  29. return run_id, project, model_artifact_name
  30. def check_wandb_resume(opt):
  31. process_wandb_config_ddp_mode(opt) if opt.global_rank not in [-1, 0] else None
  32. if isinstance(opt.resume, str):
  33. if opt.resume.startswith(WANDB_ARTIFACT_PREFIX):
  34. if opt.global_rank not in [-1, 0]: # For resuming DDP runs
  35. run_id, project, model_artifact_name = get_run_info(opt.resume)
  36. api = wandb.Api()
  37. artifact = api.artifact(project + '/' + model_artifact_name + ':latest')
  38. modeldir = artifact.download()
  39. opt.weights = str(Path(modeldir) / "last.pt")
  40. return True
  41. return None
  42. def process_wandb_config_ddp_mode(opt):
  43. with open(opt.data) as f:
  44. data_dict = yaml.load(f, Loader=yaml.SafeLoader) # data dict
  45. train_dir, val_dir = None, None
  46. if isinstance(data_dict['train'], str) and data_dict['train'].startswith(WANDB_ARTIFACT_PREFIX):
  47. api = wandb.Api()
  48. train_artifact = api.artifact(remove_prefix(data_dict['train']) + ':' + opt.artifact_alias)
  49. train_dir = train_artifact.download()
  50. train_path = Path(train_dir) / 'data/images/'
  51. data_dict['train'] = str(train_path)
  52. if isinstance(data_dict['val'], str) and data_dict['val'].startswith(WANDB_ARTIFACT_PREFIX):
  53. api = wandb.Api()
  54. val_artifact = api.artifact(remove_prefix(data_dict['val']) + ':' + opt.artifact_alias)
  55. val_dir = val_artifact.download()
  56. val_path = Path(val_dir) / 'data/images/'
  57. data_dict['val'] = str(val_path)
  58. if train_dir or val_dir:
  59. ddp_data_path = str(Path(val_dir) / 'wandb_local_data.yaml')
  60. with open(ddp_data_path, 'w') as f:
  61. yaml.dump(data_dict, f)
  62. opt.data = ddp_data_path
  63. class WandbLogger():
  64. def __init__(self, opt, name, run_id, data_dict, job_type='Training'):
  65. # Pre-training routine --
  66. self.job_type = job_type
  67. self.wandb, self.wandb_run, self.data_dict = wandb, None if not wandb else wandb.run, data_dict
  68. # It's more elegant to stick to 1 wandb.init call, but useful config data is overwritten in the WandbLogger's wandb.init call
  69. if isinstance(opt.resume, str): # checks resume from artifact
  70. if opt.resume.startswith(WANDB_ARTIFACT_PREFIX):
  71. run_id, project, model_artifact_name = get_run_info(opt.resume)
  72. model_artifact_name = WANDB_ARTIFACT_PREFIX + model_artifact_name
  73. assert wandb, 'install wandb to resume wandb runs'
  74. # Resume wandb-artifact:// runs here| workaround for not overwriting wandb.config
  75. self.wandb_run = wandb.init(id=run_id, project=project, resume='allow')
  76. opt.resume = model_artifact_name
  77. elif self.wandb:
  78. self.wandb_run = wandb.init(config=opt,
  79. resume="allow",
  80. project='YOLOv5' if opt.project == 'runs/train' else Path(opt.project).stem,
  81. name=name,
  82. job_type=job_type,
  83. id=run_id) if not wandb.run else wandb.run
  84. if self.wandb_run:
  85. if self.job_type == 'Training':
  86. if not opt.resume:
  87. wandb_data_dict = self.check_and_upload_dataset(opt) if opt.upload_dataset else data_dict
  88. # Info useful for resuming from artifacts
  89. self.wandb_run.config.opt = vars(opt)
  90. self.wandb_run.config.data_dict = wandb_data_dict
  91. self.data_dict = self.setup_training(opt, data_dict)
  92. if self.job_type == 'Dataset Creation':
  93. self.data_dict = self.check_and_upload_dataset(opt)
  94. else:
  95. prefix = colorstr('wandb: ')
  96. print(f"{prefix}Install Weights & Biases for YOLOv5 logging with 'pip install wandb' (recommended)")
  97. def check_and_upload_dataset(self, opt):
  98. assert wandb, 'Install wandb to upload dataset'
  99. check_dataset(self.data_dict)
  100. config_path = self.log_dataset_artifact(opt.data,
  101. opt.single_cls,
  102. 'YOLOv5' if opt.project == 'runs/train' else Path(opt.project).stem)
  103. print("Created dataset config file ", config_path)
  104. with open(config_path) as f:
  105. wandb_data_dict = yaml.load(f, Loader=yaml.SafeLoader)
  106. return wandb_data_dict
  107. def setup_training(self, opt, data_dict):
  108. self.log_dict, self.current_epoch, self.log_imgs = {}, 0, 16 # Logging Constants
  109. self.bbox_interval = opt.bbox_interval
  110. if isinstance(opt.resume, str):
  111. modeldir, _ = self.download_model_artifact(opt)
  112. if modeldir:
  113. self.weights = Path(modeldir) / "last.pt"
  114. config = self.wandb_run.config
  115. opt.weights, opt.save_period, opt.batch_size, opt.bbox_interval, opt.epochs, opt.hyp = str(
  116. self.weights), config.save_period, config.total_batch_size, config.bbox_interval, config.epochs, \
  117. config.opt['hyp']
  118. data_dict = dict(self.wandb_run.config.data_dict) # eliminates the need for config file to resume
  119. if 'val_artifact' not in self.__dict__: # If --upload_dataset is set, use the existing artifact, don't download
  120. self.train_artifact_path, self.train_artifact = self.download_dataset_artifact(data_dict.get('train'),
  121. opt.artifact_alias)
  122. self.val_artifact_path, self.val_artifact = self.download_dataset_artifact(data_dict.get('val'),
  123. opt.artifact_alias)
  124. self.result_artifact, self.result_table, self.val_table, self.weights = None, None, None, None
  125. if self.train_artifact_path is not None:
  126. train_path = Path(self.train_artifact_path) / 'data/images/'
  127. data_dict['train'] = str(train_path)
  128. if self.val_artifact_path is not None:
  129. val_path = Path(self.val_artifact_path) / 'data/images/'
  130. data_dict['val'] = str(val_path)
  131. self.val_table = self.val_artifact.get("val")
  132. self.map_val_table_path()
  133. if self.val_artifact is not None:
  134. self.result_artifact = wandb.Artifact("run_" + wandb.run.id + "_progress", "evaluation")
  135. self.result_table = wandb.Table(["epoch", "id", "prediction", "avg_confidence"])
  136. if opt.bbox_interval == -1:
  137. self.bbox_interval = opt.bbox_interval = (opt.epochs // 10) if opt.epochs > 10 else 1
  138. return data_dict
  139. def download_dataset_artifact(self, path, alias):
  140. if isinstance(path, str) and path.startswith(WANDB_ARTIFACT_PREFIX):
  141. dataset_artifact = wandb.use_artifact(remove_prefix(path, WANDB_ARTIFACT_PREFIX) + ":" + alias)
  142. assert dataset_artifact is not None, "'Error: W&B dataset artifact doesn\'t exist'"
  143. datadir = dataset_artifact.download()
  144. return datadir, dataset_artifact
  145. return None, None
  146. def download_model_artifact(self, opt):
  147. if opt.resume.startswith(WANDB_ARTIFACT_PREFIX):
  148. model_artifact = wandb.use_artifact(remove_prefix(opt.resume, WANDB_ARTIFACT_PREFIX) + ":latest")
  149. assert model_artifact is not None, 'Error: W&B model artifact doesn\'t exist'
  150. modeldir = model_artifact.download()
  151. epochs_trained = model_artifact.metadata.get('epochs_trained')
  152. total_epochs = model_artifact.metadata.get('total_epochs')
  153. assert epochs_trained < total_epochs, 'training to %g epochs is finished, nothing to resume.' % (
  154. total_epochs)
  155. return modeldir, model_artifact
  156. return None, None
  157. def log_model(self, path, opt, epoch, fitness_score, best_model=False):
  158. model_artifact = wandb.Artifact('run_' + wandb.run.id + '_model', type='model', metadata={
  159. 'original_url': str(path),
  160. 'epochs_trained': epoch + 1,
  161. 'save period': opt.save_period,
  162. 'project': opt.project,
  163. 'total_epochs': opt.epochs,
  164. 'fitness_score': fitness_score
  165. })
  166. model_artifact.add_file(str(path / 'last.pt'), name='last.pt')
  167. wandb.log_artifact(model_artifact,
  168. aliases=['latest', 'epoch ' + str(self.current_epoch), 'best' if best_model else ''])
  169. print("Saving model artifact on epoch ", epoch + 1)
  170. def log_dataset_artifact(self, data_file, single_cls, project, overwrite_config=False):
  171. with open(data_file) as f:
  172. data = yaml.load(f, Loader=yaml.SafeLoader) # data dict
  173. nc, names = (1, ['item']) if single_cls else (int(data['nc']), data['names'])
  174. names = {k: v for k, v in enumerate(names)} # to index dictionary
  175. self.train_artifact = self.create_dataset_table(LoadImagesAndLabels(
  176. data['train']), names, name='train') if data.get('train') else None
  177. self.val_artifact = self.create_dataset_table(LoadImagesAndLabels(
  178. data['val']), names, name='val') if data.get('val') else None
  179. if data.get('train'):
  180. data['train'] = WANDB_ARTIFACT_PREFIX + str(Path(project) / 'train')
  181. if data.get('val'):
  182. data['val'] = WANDB_ARTIFACT_PREFIX + str(Path(project) / 'val')
  183. path = data_file if overwrite_config else '_wandb.'.join(data_file.rsplit('.', 1)) # updated data.yaml path
  184. data.pop('download', None)
  185. with open(path, 'w') as f:
  186. yaml.dump(data, f)
  187. if self.job_type == 'Training': # builds correct artifact pipeline graph
  188. self.wandb_run.use_artifact(self.val_artifact)
  189. self.wandb_run.use_artifact(self.train_artifact)
  190. self.val_artifact.wait()
  191. self.val_table = self.val_artifact.get('val')
  192. self.map_val_table_path()
  193. else:
  194. self.wandb_run.log_artifact(self.train_artifact)
  195. self.wandb_run.log_artifact(self.val_artifact)
  196. return path
  197. def map_val_table_path(self):
  198. self.val_table_map = {}
  199. print("Mapping dataset")
  200. for i, data in enumerate(tqdm(self.val_table.data)):
  201. self.val_table_map[data[3]] = data[0]
  202. def create_dataset_table(self, dataset, class_to_id, name='dataset'):
  203. # TODO: Explore multiprocessing to slpit this loop parallely| This is essential for speeding up the the logging
  204. artifact = wandb.Artifact(name=name, type="dataset")
  205. img_files = tqdm([dataset.path]) if isinstance(dataset.path, str) and Path(dataset.path).is_dir() else None
  206. img_files = tqdm(dataset.img_files) if not img_files else img_files
  207. for img_file in img_files:
  208. if Path(img_file).is_dir():
  209. artifact.add_dir(img_file, name='data/images')
  210. labels_path = 'labels'.join(dataset.path.rsplit('images', 1))
  211. artifact.add_dir(labels_path, name='data/labels')
  212. else:
  213. artifact.add_file(img_file, name='data/images/' + Path(img_file).name)
  214. label_file = Path(img2label_paths([img_file])[0])
  215. artifact.add_file(str(label_file),
  216. name='data/labels/' + label_file.name) if label_file.exists() else None
  217. table = wandb.Table(columns=["id", "train_image", "Classes", "name"])
  218. class_set = wandb.Classes([{'id': id, 'name': name} for id, name in class_to_id.items()])
  219. for si, (img, labels, paths, shapes) in enumerate(tqdm(dataset)):
  220. height, width = shapes[0]
  221. labels[:, 2:] = (xywh2xyxy(labels[:, 2:].view(-1, 4))) * torch.Tensor([width, height, width, height])
  222. box_data, img_classes = [], {}
  223. for cls, *xyxy in labels[:, 1:].tolist():
  224. cls = int(cls)
  225. box_data.append({"position": {"minX": xyxy[0], "minY": xyxy[1], "maxX": xyxy[2], "maxY": xyxy[3]},
  226. "class_id": cls,
  227. "box_caption": "%s" % (class_to_id[cls]),
  228. "scores": {"acc": 1},
  229. "domain": "pixel"})
  230. img_classes[cls] = class_to_id[cls]
  231. boxes = {"ground_truth": {"box_data": box_data, "class_labels": class_to_id}} # inference-space
  232. table.add_data(si, wandb.Image(paths, classes=class_set, boxes=boxes), json.dumps(img_classes),
  233. Path(paths).name)
  234. artifact.add(table, name)
  235. return artifact
  236. def log_training_progress(self, predn, path, names):
  237. if self.val_table and self.result_table:
  238. class_set = wandb.Classes([{'id': id, 'name': name} for id, name in names.items()])
  239. box_data = []
  240. total_conf = 0
  241. for *xyxy, conf, cls in predn.tolist():
  242. if conf >= 0.25:
  243. box_data.append(
  244. {"position": {"minX": xyxy[0], "minY": xyxy[1], "maxX": xyxy[2], "maxY": xyxy[3]},
  245. "class_id": int(cls),
  246. "box_caption": "%s %.3f" % (names[cls], conf),
  247. "scores": {"class_score": conf},
  248. "domain": "pixel"})
  249. total_conf = total_conf + conf
  250. boxes = {"predictions": {"box_data": box_data, "class_labels": names}} # inference-space
  251. id = self.val_table_map[Path(path).name]
  252. self.result_table.add_data(self.current_epoch,
  253. id,
  254. wandb.Image(self.val_table.data[id][1], boxes=boxes, classes=class_set),
  255. total_conf / max(1, len(box_data))
  256. )
  257. def log(self, log_dict):
  258. if self.wandb_run:
  259. for key, value in log_dict.items():
  260. self.log_dict[key] = value
  261. def end_epoch(self, best_result=False):
  262. if self.wandb_run:
  263. wandb.log(self.log_dict)
  264. self.log_dict = {}
  265. if self.result_artifact:
  266. train_results = wandb.JoinedTable(self.val_table, self.result_table, "id")
  267. self.result_artifact.add(train_results, 'result')
  268. wandb.log_artifact(self.result_artifact, aliases=['latest', 'epoch ' + str(self.current_epoch),
  269. ('best' if best_result else '')])
  270. self.result_table = wandb.Table(["epoch", "id", "prediction", "avg_confidence"])
  271. self.result_artifact = wandb.Artifact("run_" + wandb.run.id + "_progress", "evaluation")
  272. def finish_run(self):
  273. if self.wandb_run:
  274. if self.log_dict:
  275. wandb.log(self.log_dict)
  276. wandb.run.finish()