# -*- coding: utf-8 -*- from io import BytesIO from traceback import format_exc import cv2 import requests from PIL import Image, ImageDraw, ImageFont import numpy as np from loguru import logger from enums.ExceptionEnum import ExceptionType from exception.CustomerException import ServiceException ''' 文字水印 ''' class TextWaterMark(): def __init__(self): self.color_dict = { # R G B # 网址查看:https://tool.oschina.net/commons?type=3 # 网址查看:http://tools.jb51.net/static/colorpicker/ 'white': (255, 255, 255, 255), 'black': (0, 0, 0, 255), 'gray': (205, 201, 201, 255), 'red': (255, 0, 0, 255), 'yellow': (255, 215, 0, 255), 'blue': (0, 0, 170, 255), 'purple': (205, 105, 201, 255), 'green': (0, 205, 0, 255) } self.position_list = [1, 2, 3, 4] """ 普通照片水印 params: image:图片 text:水印文字 position:水印位置 1:左上 2:右上 3:右下 4:左下 fontface: 字体 fontsize:字体大小 fontcolor:字体颜色 [white, black, gray, red, yellow, blue, purple, green] """ def common_water(self, image, text, position=1, fontface='../font/simsun.ttc', fontsize=20, fontcolor='black'): flag = False if isinstance(image, np.ndarray): # 判断是否OpenCV图片类型 flag = True image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGRA2RGBA)) if position not in self.position_list: position = 1 w, h = image.size[:2] keys = self.color_dict.keys() if fontcolor not in keys: fontcolor = 'black' color = self.color_dict[fontcolor] fnt = ImageFont.truetype(fontface, fontsize) im = image.convert('RGBA') mask = Image.new('RGBA', im.size, (0, 0, 0, 0)) d = ImageDraw.Draw(mask) size_w, size_h = d.textsize(text, font=fnt) if position == 1: weizhi = (w * 0.1, h * 0.1) elif position == 2: weizhi = (w * 0.9 - size_w, h * 0.1) elif position == 3: weizhi = (w * 0.9 - size_w, h * 0.9 - size_h) else: weizhi = (w * 0.1, h * 0.9 - size_h) # position 为左上角位置 d.text(weizhi, text, font=fnt, fill=color) out = Image.alpha_composite(im, mask) if flag: out = cv2.cvtColor(np.asarray(out), cv2.COLOR_BGRA2RGBA) return out """ 半透明水印,布满整张图,并且自动旋转45° params: image:图片 text:文字 fontsize:文字大小 """ def fill_water(self, image, text, fontsize): flag = False if isinstance(image, np.ndarray): # 判断是否OpenCV图片类型 flag = True image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGRA2RGBA)) font = ImageFont.truetype('../font/simsun.ttc', fontsize) # 添加背景 new_img = Image.new('RGBA', (image.size[0] * 3, image.size[1] * 3), (255, 255, 255, 255)) new_img.paste(image, image.size) # 添加水印 font_len = len(text) rgba_image = new_img.convert('RGBA') text_overlay = Image.new('RGBA', rgba_image.size, (0, 0, 0, 0)) image_draw = ImageDraw.Draw(text_overlay) for i in range(0, rgba_image.size[0], font_len * 40 + 100): for j in range(0, rgba_image.size[1], 200): # print(f'i:{i}, j:{j}, text:{text}, font:{font}') image_draw.text((i, j), text, font=font, fill=(0, 0, 0, 50)) text_overlay = text_overlay.rotate(-45) image_with_text = Image.alpha_composite(rgba_image, text_overlay) image_with_text = image_with_text.crop((image.size[0], image.size[1], image.size[0] * 2, image.size[1] * 2)) if flag: image_with_text = cv2.cvtColor(np.asarray(image_with_text), cv2.COLOR_BGRA2RGBA) return image_with_text class PictureWaterMark: __slots__ = ('logo', '__requestId') def __init__(self, logo=None, requestId=None): self.__requestId = requestId self.logo = logo if requestId is None: self.__requestId = '1' if logo is None: self.logo = cv2.imread("./image/logo.png", -1) # def common_water(self, image, logo): # width, height = image.shape[1], image.shape[0] # mark_width, mark_height = logo.shape[1], logo.shape[0] # rate = int(width * 0.2) / mark_width # logo_new = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST) # position = (int(width * 0.95 - logo_new.shape[1]), int(height * 0.95 - logo_new.shape[0])) # b = Image.new('RGBA', (width, height), (0, 0, 0, 0)) # 创建新图像:透明' # a = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) # watermark = Image.fromarray(cv2.cvtColor(logo_new, cv2.COLOR_BGRA2RGBA)) # # 图片旋转 # # watermark = watermark.rotate(45) # b.paste(a, (0, 0)) # b.paste(watermark, position, mask=watermark) # return cv2.cvtColor(np.asarray(b), cv2.COLOR_BGR2RGB) def common_water_1(self, image, logo, alpha=1): try: h, w = image.shape[0], image.shape[1] # if w >= h: rate = int(w * 0.1) / logo.shape[1] # else: # rate = int(h * 0.1) / logo.shape[0] mask = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST) mask_h, mask_w = mask.shape[0], mask.shape[1] mask_channels = cv2.split(mask) dst_channels = cv2.split(image) # b, g, r, a = cv2.split(mask) # 计算mask在图片的坐标 # if w >= h: ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w)) dr_points = (int(h * 0.95), int(w - h * 0.05)) # else: # ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w)) # dr_points = (int(h * 0.95), int(w - h * 0.05)) for i in range(3): dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] = dst_channels[i][ ul_points[0]: dr_points[0], ul_points[1]: dr_points[ 1]] * ( 255.0 - mask_channels[ 3] * alpha) / 255 dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] += np.array( mask_channels[i] * (mask_channels[3] * alpha / 255), dtype=np.uint8) dst_img = cv2.merge(dst_channels) return dst_img except Exception: logger.error("加水印异常:{}, requestId:{}", format_exc(), self.__requestId) return image def add_water_pic(image, logo, requestId, alpha=1): try: h, w = image.shape[0], image.shape[1] # if w >= h: rate = int(w * 0.1) / logo.shape[1] # else: # rate = int(h * 0.1) / logo.shape[0] mask = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST) mask_h, mask_w = mask.shape[0], mask.shape[1] mask_channels = cv2.split(mask) dst_channels = cv2.split(image) # b, g, r, a = cv2.split(mask) # 计算mask在图片的坐标 # if w >= h: ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w)) dr_points = (int(h * 0.95), int(w - h * 0.05)) # else: # ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w)) # dr_points = (int(h * 0.95), int(w - h * 0.05)) for i in range(3): dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] = dst_channels[i][ ul_points[0]: dr_points[0], ul_points[1]: dr_points[ 1]] * ( 255.0 - mask_channels[ 3] * alpha) / 255 dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] += np.array( mask_channels[i] * (mask_channels[3] * alpha / 255), dtype=np.uint8) dst_img = cv2.merge(dst_channels) return dst_img except Exception: logger.error("加水印异常:{}, requestId:{}", format_exc(), requestId) return image # 差值感知算法 def dHash(image): # 缩放9*8 image = cv2.resize(image, (9, 8), interpolation=cv2.INTER_CUBIC) # 转换灰度图 image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # print(image.shape) hash = [] # 每行前一个像素大于后一个像素为1,相反为0,生成哈希 for i in range(8): for j in range(8): if image[i, j] > image[i, j + 1]: hash.append(1) else: hash.append(0) return hash # 计算汉明距离 def Hamming_distance(hash1, hash2): num = 0 for index in range(len(hash1)): if hash1[index] != hash2[index]: num += 1 return num def url2Array(url, enable_ex=True): try: response = requests.get(url) image = Image.open(BytesIO(response.content)) image1 = np.array(image) img_bgr = cv2.cvtColor(image1, cv2.COLOR_RGB2BGR) return img_bgr except Exception: logger.exception("url地址请求异常: {}", format_exc()) if enable_ex: raise ServiceException(ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[0], ExceptionType.URL_ADDRESS_ACCESS_FAILED.value[1]) return None def url2Content(url): response = requests.get(url) return response.content def url2Image(url): response = requests.get(url) image = Image.open(BytesIO(response.content)) image1 = np.array(image) img_bgr = cv2.cvtColor(image1, cv2.COLOR_RGB2BGR) img = Image.fromarray(img_bgr) return img def url2Byte(url): response = requests.get(url) return BytesIO(response.content) def markRectangle(url, text, textCoordinate, imageLeftUpCoordinate, imageRightDownCoordinate, color): img = url2Array(url) # ( 蓝, 绿, 红) # 红色 (0, 0, 255) # 洋红色 (255, 0, 255) # 青色 (255, 255, 0) # 黑色 (0, 0, 0) # 蓝色 (255, 0, 0) # 绿色 (0, 255, 0) # 黄色 (0, 255, 255) # 不考虑 cv2.putText(img, text, textCoordinate, cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 1, cv2.LINE_AA) # rectangle 坐标的参数格式为左上角(x1, y1),右下角(x2, y2), 颜色 , 粗细 cv2.rectangle(img, imageLeftUpCoordinate, imageRightDownCoordinate, color, 2) return img # def draw_painting_joint(img, xywh, score=0.5, color=None, # font={'line_thickness': None, 'boxLine_thickness': None, 'fontSize': None}): # imh, imw, imc = img.shape # tl = font['line_thickness'] or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # box_tl = font['boxLine_thickness'] or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # 根据图像的尺寸计算出适合用于绘制图像边框的线宽。 # c1, c2 = (int(xywh[0]), int(xywh[1])), (int(xywh[2]), int(xywh[3])) # cv2.rectangle(img, c1, c2, color, thickness=box_tl, lineType=cv2.LINE_AA) # # label = ' %.2f' % (score) # tf = max(tl, 1) # font thickness # fontScale = font['fontSize'] or tl * 0.33 # t_size = cv2.getTextSize(label, 0, fontScale=fontScale, thickness=tf)[0] # cv2.rectangle(img, (int(box[0]) + lw, int(box[1])), c2, color, -1, cv2.LINE_AA) # filled # cv2.putText(img, label, (c1[0] + lw, c1[1] - (lh - t_size[1]) // 2), 0, fontScale, [225, 255, 255], thickness=tf, # lineType=cv2.LINE_AA) # # print('#####line224 fontScale:',fontScale,' thickness:',tf,' line_thickness:',font['line_thickness'],' boxLine thickness:',box_tl) # return img def img_pad(img, size, pad_value=[114, 114, 114]): ###填充成固定尺寸 H, W, _ = img.shape r = max(H / size[0], W / size[1]) img_r = cv2.resize(img, (int(W / r), int(H / r))) tb = size[0] - img_r.shape[0] lr = size[1] - img_r.shape[1] top = int(tb / 2) bottom = tb - top left = int(lr / 2) right = lr - left pad_image = cv2.copyMakeBorder(img_r, top, bottom, left, right, cv2.BORDER_CONSTANT, value=pad_value) return pad_image, (top, left, r) def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32): # 调整图像大小和填充图像,同时满足步幅多重约束 shape = img.shape[:2] # current shape [height, width] 当前形状 [高度、宽度] if isinstance(new_shape, int): new_shape = (new_shape, new_shape) # Scale ratio (new / old) 缩放比例(新/旧) r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) if not scaleup: # 仅缩减,不纵向扩展(为了更好的测试 mAP) r = min(r, 1.0) ratio = r, r # width, height ratios 宽度、高度比 new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding if auto: # 最小矩形 dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding elif scaleFill: # stretch dw, dh = 0.0, 0.0 new_unpad = (new_shape[1], new_shape[0]) ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios dw /= 2 # divide padding into 2 sides dh /= 2 if shape[::-1] != new_unpad: # resize img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border return img, ratio, (dw, dh) # if __name__ == '__main__': # # img = cv2.imread("../test/a.jpg", -1) # # fontcolor = 'yellow' # # # # water = TextWaterMark() # # text = "hello world" # # # # # fill_img = water.common_water(img, text, position=4, fontface='../font/庞门正道标题体2.0增强版.ttf', fontsize=20, fontcolor='black') # # fill_img = water.fill_water(img, text, 20) # # # 一定要保存为png格式 # # cv2.imshow('result', fill_img) # # cv2.waitKey(111111110) # # print('finish') # pic = PictureWaterMark() # image = cv2.imread("a.jpg") # logo = cv2.imread("../image/logo.png", -1) # # print(image, logo) # start = time.time() # frame = pic.common_water(image, logo) # print(time.time() - start) # start1 = time.time() # frame1 = pic.common_water_1(image, logo) # # cv2.imwrite("watermarked.jpg", frame1) # print(time.time() - start1) # # cap = cv2.VideoCapture("../data/111111.mp4") # # logo = cv2.imread("../image/logo.png", -1) # # while True: # # is_opened, frame = cap.read() # # frame = pic.common_water(frame, logo) # # cv2.imshow('frame', frame) # # cv2.waitKey(1) # 等待输入任何按键 # # # 关闭 # # cap.release()