NOTE:代码仅用来参考,没时间解释啦!

🍉一、自动从数据库从抽取数据。

入职算法工程师后敲的非常有趣使用的小工具-LMLPHP

在某台服务器中,从存放数据集的数据库自动抽取标注好的数据标签,这一步操作有什么用呢?当我们发现我们数据不均衡的时候,就如上图右边部分。我们可以从数据库中抽取缺少的数据标签进行填充。

import os
import shutil
# from get_structs import print_file_structure
import random

def print_file_structure(file_path, indent=''):
    if os.path.isfile(file_path):
        print(indent + '├── ' + os.path.basename(file_path))
    elif os.path.isdir(file_path):
        print(indent + '├── ' + os.path.basename(file_path))
        for root, dirs, files in os.walk(file_path):
            for name in dirs:
                print(indent + '│   ├── ' + name)
            for name in files:
                print(indent + '│   └── ' + name)
                break  # Only print files in the top-level directory
            break  # Only print directories in the top-level directory
    else:
        print('无效的文件路径')

def from_dataset_get_data_label(source_dataset_path, label):
    subFiles = os.listdir(source_dataset_path)
    if label not in subFiles:
        print("您输入的标签名无效,不存在于test子目录中!")
        return 
    target_path = os.path.join(source_dataset_path, label)
    label_lenght = count_jpg_files(target_path)
    print("<{}>标签的数量统计为:【{}】".format(label, label_lenght))
    print('------------------------------------')
    all_need_img_paths = []
    all_need_xml_paths = []
    for file_name in os.listdir(target_path):
        subPath = os.path.join(target_path, file_name)
        if not os.path.isdir(subPath):
            continue
        for data_name in os.listdir(subPath):
            if data_name.endswith('.jpg'):
                xml_file = os.path.splitext(data_name)[0] + '.xml'
                if os.path.exists(os.path.join(subPath, xml_file)):
                    all_need_img_paths.append(os.path.join(subPath, data_name))
                    all_need_xml_paths.append(os.path.join(subPath, xml_file))
    # print(all_need_img_paths[:5])
    print("统计有xml的图片数量:",len(all_need_img_paths))
    print('------------------------------------')
    get_num = int(input("请输入您要随机抽取的数据数量:"))
    print('------------------------------------')
    if get_num > len(all_need_img_paths):
        get_num = len(all_need_img_paths) - 1
    random_indexs = random.sample(range(len(all_need_img_paths)), get_num)
    print("请注意!所有文件都会复制到工作目录,请慎重选择工作目录。")
    print('------------------------------------')
    opt = input("请选择您的移动方式:[cp/mv]")
    print('------------------------------------')
    while opt not in ['cp', 'mv']:
        opt = input("[ERROR]请选择您的移动方式:[cp/mv]")
        print('------------------------------------')
    if opt == 'cp':
        for inx in random_indexs:
            wd = os.getcwd()
            if not os.path.exists(wd + '/' + 'images'):
                os.makedirs(wd + '/' + 'images')
            if not os.path.exists(wd + '/' + 'Annotations'):
                os.makedirs(wd + '/' + 'Annotations')

            img_path = all_need_img_paths[inx]
            shutil.copyfile(img_path, wd + '/' + 'images/' + img_path.split('/')[-1])
            xml_path = all_need_xml_paths[inx]
            shutil.copyfile(xml_path, wd + '/' + 'Annotations/' + xml_path.split('/')[-1])
    elif opt == 'mv':
        pass

    print("在上列操作中您选择了{}标签,从中抽取了{}数据量,并且使用{}方式放到了{}工作目录下。".format(label, get_num, opt, wd))
    print('------------------------------------')
def count_jpg_files(path):
    count = 0
    for root, dirs, files in os.walk(path):
        for file in files:
            if file.endswith('.jpg'):
                xml_file = os.path.splitext(file)[0] + '.xml'
                if os.path.exists(os.path.join(root, xml_file)):
                    count += 1
    return count

if __name__ == "__main__":
    source_dataset_path = '/data/personal/chz/find_allimgs_label/test'
    use_labels = ["zsd_m","zsd_l","fhz_h","fhz_f","kk_f","kk_h","fhz_bs", "fhz_ycn","fhz_wcn","fhz_red_h", "fhz_green_f", "fhz_m", "bs_ur", "bs_ul", "bs_up", "bs_down", "fhz_ztyc", "bs_right", "bs_left", "bs_dl", "bs_dr", "kgg_ybh", "kgg_ybf", "yljdq_flow", "yljdq_stop"]
    print_file_structure(source_dataset_path, "")
    print('------------------------------------')
    label = input("请您根据上列中的test菜单,选取您想要的标签:")
    print('------------------------------------')
    from_dataset_get_data_label(source_dataset_path, label)
    

🍉二、 自动从指定minIo拉取图片到另外一台minIO

import minio
import pymysql
import openpyxl
import os 

def get_data_from_mysql():
    # 连接数据库-
    conn = pymysql.connect(host="10.168.1.94", user="", passwd="", db="RemotePatrolDB", port=, charset="utf8")
    cur = conn.cursor()  # 创建游标对象

    # 查询表中数据
    cur.execute("SELECT * FROM CorrectPoint;")
    df = cur.fetchall()  # 获取所有数据
    imageUrls = []
    for data in df:
        imageUrls.append(data[15])
        # print(data[15])
    cur.close()
    conn.close()
    return imageUrls

def save_for_excel(df):
    wb = openpyxl.Workbook()
    ws = wb.active
    for row in df:
        ws.append(row)
    wb.save("文件名.xlsx")


# 从minio上面拉取图片
def load_data_minio(bucket: str, imageUrls):
    minio_conf = {
    'endpoint': '10.168.1.96:9000',
    'access_key': '',
    'secret_key': '',
    'secure': False
}
    client = minio.Minio(**minio_conf)
    if not client.bucket_exists(bucket):
        return None
    
    root_path = os.path.join("imageUrlFromminIO") 
    for imageUrl in imageUrls:
        imageUrl = imageUrl.split('/')[-1]
        data = client.get_object(bucket, imageUrl)
        save_path = os.path.join(root_path, imageUrl)  
        with open(save_path, 'wb') as file_data:
            for d in data.stream(32 * 1024):
                file_data.write(d)
    return data.data

# 上传图片到minio    
def up_data_minio(bucket: str, image_Urls_path='imageUrlFromminIO'):
    # TODO:minio_conf唯一要修改的地方!
    minio_conf = {
    'endpoint': '192.168.120.188', 
    'access_key': '',
    'secret_key': '',
    'secure': False
}
    for im_name in os.listdir(image_Urls_path):
        client = minio.Minio(**minio_conf)
        '''
        client.fput_object('mybucket', 'myobject.jpg', '/path/to/myobject.jpg', content_type='image/jpeg')
        '''
        client.fput_object(bucket_name=bucket, object_name=im_name,
                        file_path=os.path.join(image_Urls_path, im_name),
                        content_type='image/jpeg'
                        )        

def download():
    # NOTE:Step:1 拉取数据库信息
    imageUrls = get_data_from_mysql()
    # NOTE:Step:2 把图片从96的minio上面拉下来
    print(type(load_data_minio("test", imageUrls)))

def upload():
    # NOTE:Step:3 把拉下来的图片传上去给XXX服务器的minio
    up_data_minio("test", image_Urls_path='imageUrlFromminIO')

if __name__ == "__main__":
    # 拉取使用
    download()
    # 上推使用
    # upload()
    '''
    用于批量修改数据库ImagePath字段信息,替换为自己的ip。
    ---
    UPDATE CorrectPoint SET ImagePath=REPLACE(ImagePath, '10.168.1.96', '192.168.120.188');
    '''    

    

🍉三、目标检测画出中文框并且自动红底白字

需要放一个文件到本地目录:

def cv2AddChineseText(self, img_ori, text, p1, box_color, textColor=(255, 255, 255), textSize=17):
        if (isinstance(img_ori, np.ndarray)):  # 判断是否OpenCV图片类型
            img = Image.fromarray(cv2.cvtColor(img_ori, cv2.COLOR_BGR2RGB))
        # 创建一个可以在给定图像上绘图的对象
        draw = ImageDraw.Draw(img)
        # 字体的格式
        fontStyle = ImageFont.truetype(
            "simsun.ttc", textSize, encoding="utf-8")
        # 绘制文本
        text_width, text_height = draw.textsize(text, font=fontStyle)
        position = []
        outside_x = p1[0] + text_width + 3 < img.width
        outside_y = p1[1] - text_height - 3 >= 0
        position.append(p1[0] + 3 if outside_x else img.width - text_width)
        position.append(p1[1] - text_height - 3 if outside_y else p1[1] + 3)
        p2 = (position[0] + text_width, position[1] +  text_height)
        image = cv2.rectangle(img_ori, position, p2, box_color, -1, cv2.LINE_AA)  # filled
        img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        draw = ImageDraw.Draw(img)
        draw.text((position[0], position[1]), text, textColor, font=fontStyle)
        # 转换回OpenCV格式
        return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)

    def draw_boxs(self, boxes, image):
        for res in boxes:
            box = [res[0], res[1], res[2]+res[0], res[3]+res[1]]
            label = self.labels[res[4]]
            conf = round(res[5], 4)

            box = np.array(box[:4], dtype=np.int32)  # xyxy

            line_width = int(3)
            txt_color = (255, 255, 255)
            box_color = (58, 56, 255)

            p1, p2 = (box[0], box[1]), (box[2], box[3])
            image = cv2.rectangle(image, p1, p2, box_color, line_width)

            tf = max(line_width - 1, 1)  # font thickness
            box_label = '%s: %.2f' % (self.get_desc(label), conf)
            image = self.cv2AddChineseText(image, box_label, p1, box_color, txt_color)
        return image

🍉四、标注得到的xml自动转成txt

使用labelimage标注的文件是xml的,无法用来yolo训练,所以需要使用自动转换工具把xml都转换为txt。

请确保目录结构如下:

入职算法工程师后敲的非常有趣使用的小工具-LMLPHP

import os
import xml.etree.ElementTree as ET
import cv2
import random
from tqdm import tqdm
from multiprocessing import Pool
import numpy as np
import shutil
'''
优化之前:
1.把函数路径改为新的数据集,先运行一次,生成txt;
2.把新的数据集Images Annotations labels都手动放入 原生数据集;
3.再把路径改回来原生数据集,再运行一次,生成txt;
问题:
(1)txt不是追加模式,虽然会在第三步被覆盖掉,但重复执行没必要。
(2)有很多地方类似(1)其实是运行了两次的。
优化之后:
1.把函数路径改为新的数据集,运行一次,完成!
'''
random.seed(0)
class Tools_xml2yolo(object):
    def __init__(self, 
                img_path = r"ft_220/images", 
                anno_path = r"ft_220/annotations_xml",
                label_path = r"ft_220/labels",
                themeFIle = 'ft_220',
                classes = [""],
                the_data_is_new=False
        ) -> None:
        self.img_path = img_path
        self.anno_path = anno_path
        self.label_path = label_path
        self.the_data_is_new = the_data_is_new
        self.classes = classes
     
        self.txt_path = themeFIle
        if the_data_is_new:
            self.ftest = open(os.path.join(self.txt_path,'test.txt'), 'a')
            self.ftrain = open(os.path.join(self.txt_path,'train.txt'), 'a')
        else:
            self.ftest = open(os.path.join(self.txt_path,'test.txt'), 'w')
            self.ftrain = open(os.path.join(self.txt_path,'train.txt'), 'w')

        train_percent = 1

        self.files = os.listdir(self.anno_path)
        num = len(self.files)
        # print('num image',num)
        list = range(num)
        tr = int(num * train_percent)
        self.train_list = random.sample(list, tr)
        print('len train', self.train_list)
        if not os.path.exists(self.label_path):
            os.makedirs(self.label_path)

    def resi(self, num):
        x = round(num, 6)
        x = str(abs(x))
        while len(x) < 8:
            x = x + str(0)
        return x

    def convert(self, size, box):
        dw = 1./size[0]
        dh = 1./size[1]
        x = (box[0] + box[1])/2.0 # x = x轴中点
        y = (box[2] + box[3])/2.0 # y = y轴中点
        w = box[1] - box[0] #w = width
        h = box[3] - box[2] # h = height
        x = self.resi(x*dw)
        w = self.resi(w*dw)
        y = self.resi(y*dh)
        h = self.resi(h*dh)
        return (x,y,w,h)

    # import glob
    def process(self, name):
        # found_flag = 0
        img_names = ['.jpg','.JPG','.PNG','.png','.jpeg']
        for j in img_names:
            img_name = os.path.splitext(name)[0] + j
            iter_image_path = os.path.join(self.img_path, img_name)
            # print("iter image path:", iter_image_path)
            if os.path.exists(iter_image_path):
                break
        
        xml_name = os.path.splitext(name)[0] + ".xml"
        txt_name = os.path.splitext(name)[0] + ".txt"
        string1 = ""
        # print(name)
        w,h = None, None
        iter_anno_path = os.path.join(self.anno_path, xml_name)
        iter_txt_path = os.path.join(self.label_path, txt_name)
        xml_file = ET.parse(iter_anno_path) 
    
        root = xml_file.getroot()
    
        try:
            with open(iter_image_path, 'rb') as f:
                check = f.read()[-2:]
            if check != b'\xff\xd9':
                print('JPEG File collapse:', iter_image_path)
                a = cv2.imdecode(np.fromfile(iter_image_path,dtype=np.uint8),-1)
                cv2.imencode(".jpg", a)[1].tofile(iter_image_path)
                h,w = cv2.imdecode(np.fromfile(iter_image_path, dtype=np.uint8),-1).shape[:2]
                print('----------Rewrite & Read image successfully----------')

            else:
                h,w = cv2.imdecode(np.fromfile(iter_image_path,dtype=np.uint8),-1).shape[:2]
        except:
            print(iter_image_path)
        
        if (w is not None) and (h is not None):
            count = 0
            for child in root.findall('object'):
                if child != '':
                    count = count + 1
            if count != 0:

                string1 = []
                for obj in root.iter('object'):
                    cls = obj.find('name').text
                    
                    if cls in self.classes:
                        cls_id = self.classes.index(cls)
                    else:
                        print(cls)
                        continue
                    xmlbox = obj.find('bndbox')
                    b = (float(xmlbox.find('xmin').text), float(xmlbox.find('xmax').text), float(xmlbox.find('ymin').text),
                        float(xmlbox.find('ymax').text))
                    bb = self.convert((w, h), b)
                    for a in bb:
                        if float(a) > 1.0:
                            print(iter_anno_path + "wrong xywh",bb)
                            return

                    string1.append(str(cls_id) + " " + " ".join([str(a) for a in bb]) + '\n')
                
                out_file = open(iter_txt_path, "w")
                for string in string1:
                    out_file.write(string)
                out_file.close()
            else:
                print('count=0')
                print(img_name)
            
        else:
            print('wh is none')

    def moveNewData(self, ):
        newImageDataPaths = os.listdir(self.img_path)
        newAnnotationPaths  = os.listdir(self.anno_path)
        newLabelPaths = os.listdir(self.label_path)
        for idx in range(len(newAnnotationPaths)):
            shutil.move(os.path.join(self.img_path, newImageDataPaths[idx]), os.path.join(self.txt_path, "images",newImageDataPaths[idx]) )
            shutil.move(os.path.join(self.anno_path, newAnnotationPaths[idx]), os.path.join(self.txt_path, "Annotations",newAnnotationPaths[idx]) )
            shutil.move(os.path.join(self.label_path, newLabelPaths[idx]), os.path.join(self.txt_path, "labels",newLabelPaths[idx]) )

    def run(self,):
        
        pbar = tqdm(total=(len(self.files)))
        update = lambda *args: pbar.update()

        pool = Pool(6)

        for i, name in enumerate(self.files):
            self.process(name)
            print("Iter:[{}:{}]".format(i+1, len(self.files)))
            '''
            pool.apply_async必须在 if __main__ == "__main__"中被定义才可以使用;
            这点以后优化得了,现在数据量少还用不上。
            所以改成面对对象class类这样运行,多进程是不会有反应的。所以加了上面这个函数。
            本来是没有的。
            '''
            pool.apply_async(self.process, args=(name), callback=update)
            # pbar.update(1)
        
        pool.close()
        pool.join()
        img_names = ['.jpg','.JPG','.PNG','.png', '.jpeg']
        for i, name in enumerate(self.files):
            for j in img_names:
                img_name = os.path.splitext(name)[0] + j
                iter_image_path = os.path.join(self.img_path, img_name)
                if os.path.exists(iter_image_path):
                    break

            if i in self.train_list:
                self.ftrain.write(iter_image_path + "\n")
            else:
                self.ftest.write(iter_image_path + "\n")

            # writeAnnotation_path = os.path.join(self.img_path, os.path.splitext(name)[0] + '.xml')
            # print("写入:", iter_image_path, writeAnnotation_path )
        # 如果有只有图片没有xml的,需要生成空白txt
        if self.anno_path == '':
            imgs = os.listdir(self.img_path)
            for img_name in imgs:
                txt_name = os.path.basename(img_name).split('.')[0] + '.txt'
                if not os.path.exists(os.path.join(self.label_path, txt_name)):
                    _ = open(os.path.join(self.label_path, txt_name),'w')
                self.ftrain.write(os.path.join(self.img_path, img_name) + "\n")
        if self.the_data_is_new:
            self.moveNewData()

if __name__ == '__main__':
    # tool = Tools_xml2yolo()
    tool = Tools_xml2yolo(
        img_path='datasets/jzl_zhoushan_train/images/',
        anno_path='datasets/jzl_zhoushan_train/Annotations/',
        label_path='datasets/jzl_zhoushan_train/labels/',
        themeFIle='datasets/jzl_zhoushan_train/',
        classes=["zsd_m","zsd_l","fhz_h","fhz_f","kk_f","kk_h","fhz_bs", "fhz_ycn","fhz_wcn","fhz_red_h", "fhz_green_f", "fhz_m", "bs_ur", "bs_ul", "bs_up", "bs_down", "fhz_ztyc", "bs_right", "bs_left", "bs_dl", "bs_dr", "kgg_ybh", "kgg_ybf", "yljdq_flow", "yljdq_stop"],
        the_data_is_new=False)

    # themeFIle是原生数据集
    # 前面三个参数是新增数据集子集
    # the_data_is_new=True: 自动把images\Annotations\labels移到原生数据集对应images\Annotations\labels里面
    # 默认把xml转换为yolo训练所需的txt格式
    tool.run()

🍉五、 使用yolo自动推理图片得到推理结果转换为训练所需xml

import os
import torch
import xml.etree.ElementTree as ET
from PIL import Image

# 分类类别名称字典
class_dict = {
    'zsd_m': '指示灯灭',
    'zsd_l': '指示灯亮',
    'fhz_h': '分合闸-合',
    'fhz_f': '分合闸-分',
    'fhz_ztyc': '分合闸-状态异常',
    'fhz_bs': '旋转把手',
    'kk_f': '空气开关-分',
    'kk_h': '空气开关-合',
    'fhz_ycn': '分合闸-已储能',
    'fhz_wcn': '分合闸未储能',
    'fhz_red_h': '分合闸-红-合',
    'fhz_green_f': '分合闸-绿-分',
    'fhz_m': '分合闸-灭',
    'bs_ur': '把手-右上',
    'bs_ul': '把手-左上',
    'bs_up': '把手-上',
    'bs_down': '把手-下',
    'bs_right': '把手-右',
    'bs_left': '把手-左',
    'bs_dl': '把手-左下',
    'bs_dr': '把手-右下',
    "kgg_ybf": "开关柜-压板分",
    "kgg_ybh": "开关柜-压板合",
    "ddzsd_green":"带电指示灯-绿色",
    "ddzsd_red":"带电指示灯-红色"
}

def detect_and_save(model_path, folder_path, iter_start_index):
    # 加载模型
    model = torch.load(model_path, map_location=torch.device('cpu'))

    # 将模型设置为评估模式
    model.eval()

    # 遍历文件夹下的每一张图片
    for ind, file_name in enumerate(os.listdir(folder_path)):
        if ind <= iter_start_index:
            continue
        if file_name.endswith('.jpg') or file_name.endswith('.png'):
            # 打开图片
            img_path = os.path.join(folder_path, file_name)
            img = Image.open(img_path)

            # 进行推理
            results = model(img)

            # 生成xml文件
            root = ET.Element('annotation')
            folder = ET.SubElement(root, 'folder')
            folder.text = os.path.basename(folder_path)
            filename = ET.SubElement(root, 'filename')
            filename.text = file_name
            size = ET.SubElement(root, 'size')
            width = ET.SubElement(size, 'width')
            width.text = str(img.width)
            height = ET.SubElement(size, 'height')
            height.text = str(img.height)
            depth = ET.SubElement(size, 'depth')
            depth.text = str(3)
            for result in results.xyxy[0]:
                if result[-1] in class_dict:
                    obj = ET.SubElement(root, 'object')
                    name = ET.SubElement(obj, 'name')
                    name.text = class_dict[result[-1]]
                    bndbox = ET.SubElement(obj, 'bndbox')
                    xmin = ET.SubElement(bndbox, 'xmin')
                    xmin.text = str(int(result[0]))
                    ymin = ET.SubElement(bndbox, 'ymin')
                    ymin.text = str(int(result[1]))
                    xmax = ET.SubElement(bndbox, 'xmax')
                    xmax.text = str(int(result[2]))
                    ymax = ET.SubElement(bndbox, 'ymax')
                    ymax.text = str(int(result[3]))

            # 保存xml文件
            xml_path = os.path.join(folder_path, os.path.splitext(file_name)[0] + '.xml')
            tree = ET.ElementTree(root)
            tree.write(xml_path)

if __name__ == "__main__":
    detect_and_save('./best.pt', './rmwrite_zhoushan/rmwrite_zhoushan', iter_start_index=180)
11-15 22:26