手势识别 trainer

本项目用 PaddlePaddle 教程传统方式,通过 fluid 的 trainer 进行手势识别。

手势识别项目中,训练有两种实现方式:trainer 和 exe;数据集也有两种划分方式:k-fold 划分和按比例划分;同时还有两种网络结构:ANN 和 CNN。
本项目默认使用 "rate 0.3" 作为数据集划分方式,使用 trainer 进行训练,ANN 作为神经网络(可以通过简单的修改注释来进行 k-fold 或 CNN 的训练)

另一个项目 EXE_手势识别 使用 exe 进行训练,CNN 作为神经网络。

下载安装命令

## CPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/cpu paddlepaddle

## GPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/gpu paddlepaddle-gpu

注意

本项目代码需要使用GPU环境来运行:

用PaddlePaddle识别手势-LMLPHP
 

并且检查相关参数设置, 例如use_gpu, fluid.CUDAPlace(0)等处是否设置正确.

In[1]
# 解压数据集
!cd /home/aistudio/data/data2182 && unzip -qo Dataset.zip
!cd /home/aistudio/data/data2182/Dataset && rm -f */.DS_Store # 删除无关文件
In[2]
import os
import time
import random
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as layers
try:
    import paddle.fluid.contrib.trainer as tnr  # for mac
except:
    import paddle.fluid as tnr  # for AIStudio

np.random.seed(int(time.clock() * 1e6))
In[3]
# 一些参数
config = {
    'data_path': '/home/aistudio/data/data2182/Dataset', # 数据集目录
    'num_classes': 10,          # 总类别数
    'shuffle_size': 100,        # 乱序缓冲区大小
    'batch_size': 10,           # 训练批处理大小
    'num_epochs': 30,           # 总训练轮数
    'log_steps': 50,            # 显示 log 的频率
    'feed_order': ['x', 'y'],   # 数据进入 trainer 的顺序
    'param_path': 'model',      # 是否继续训练?None: 重新训练;参数缓存路径: 继续训练
    'model_save_dir': 'model',  # 参数缓存路径
    'model_freeze_dir': 'infer_model',  # 模型固化路径
    'use_gpu': True,    # 是否使用 GPU 训练
}
In[4]
# 统一的 logger 配置
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging

logger = None

def init_log_config():
    """
    初始化日志相关配置
    :return:
    """
    global logger
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    log_path = os.path.join(os.getcwd(), 'logs')
    if not os.path.exists(log_path):
        os.makedirs(log_path)
    log_name = os.path.join(log_path, 'train.log')
    sh = logging.StreamHandler()
    fh = logging.FileHandler(log_name, mode='w')
    fh.setLevel(logging.DEBUG)
    formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
    fh.setFormatter(formatter)
    sh.setFormatter(formatter)
    logger.handlers = []
    logger.addHandler(sh)
    logger.addHandler(fh)

init_log_config()
 

定义数据读取器

以下代码定义两种数据读取器:rate_readers 和 k_fold_readers。

rate_readers 根据参数 rate 划分训练集和测试集
    如 rate=0.3,则训练集占70%,测试集占30%
    通过对打乱的数据集索引进行切分实现。

k_fold_readers 根据参数 k 决定训练集测试集划分比例,以及总轮数
    如 k=10,则共生成 10 组 reader,每组 reader 中训练集占90%,测试集占10%
    通过测试集的平移来覆盖整个数据集,保证每个数据至少在测试集中出现一次。

In[5]
# 数据集读取代码

def read_an_image(path):
    ''' 读取一张图片,转为 np.array 并归一化 '''
    img = np.vstack(np.array(Image.open(path).resize((64, 64), Image.ANTIALIAS)))\
        .reshape(1, 3, 64, 64)\
        .astype('float32')
    img = img / 255.0 * 2.0 - 1
    return img

def get_dataset_size():
    '''
    读取数据集各类别的数据量
    :return 长度为 num_classes 的列表,每一项为一个类别的数据量
    '''
    sizes = []
    for i in range(config['num_classes']):
        sizes.append(len(os.listdir(os.path.join(config['data_path'], str(i)))))
    return sizes

def create_rate_readers(rate):
    """
    根据 rate 创建 train_reader 和 test_reader
    :param rate: 测试集所占比例,取值范围 [0.01, 0.5]
    :return: train_reader 和 test_reader
    """
    if rate > 0.5 or rate < 0.01:
        raise ValueError("rate should in [0.01, 0.5]")
    dataset_size = get_dataset_size()
    train_list = []
    test_list = []
    for i in range(config['num_classes']):
        size = dataset_size[i]
        train_size = size - int(size * rate)
        data = np.arange(size)
        np.random.shuffle(data)
        train_list.append(data[:train_size])
        test_list.append(data[train_size:])

    def train_reader():
        mark_v = 0  # 记录数据耗尽的类别数
        chlist = [0 for i in range(config['num_classes'])]  # 记录每个类别的当前 index
        while mark_v < config['num_classes']:
            for i in range(config['num_classes']):
                if chlist[i] >= len(train_list[i]):
                    mark_v += 1
                    continue

                idx = train_list[i][chlist[i]]
                chlist[i] += 1

                filename = os.listdir(os.path.join(config['data_path'], str(i)))[idx]
                yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i

    def test_reader():
        mark_v = 0
        chlist = [0 for i in range(config['num_classes'])]
        while mark_v < config['num_classes']:
            for i in range(config['num_classes']):
                if chlist[i] >= len(test_list[i]):
                    mark_v += 1
                    continue

                idx = test_list[i][chlist[i]]
                chlist[i] += 1

                filename = os.listdir(os.path.join(config['data_path'], str(i)))[idx]
                yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i

    return train_reader, test_reader



def create_k_fold_readers(k):
    """
    生成 k 个 train_reader 和 test_reader,按照 1/k 的比例划分,k 组 reader 使得测试集覆盖整个数据集
    :param k: 划分数据集的比例以及生成 reader 的数量
    :return: k_fold_reader_creator, 调用 k 次以获取所有数据
    """
    assert isinstance(k, int)
    if k > 100 or k < 1:
        raise ValueError("k should be an Integer in [1, 100]")
    dataset_size = get_dataset_size()
    domain_bottom = []
    domain_top = []
    for i in range(config['num_classes']):
        domain_bottom.append(0)
        domain_top.append(dataset_size[i] // k + 1)

    for term in range(k):
        def train_reader():
            mark_v = 0  # 记录数据耗尽的类别数
            chlist = [0 for i in range(config['num_classes'])]  # 记录每个类别的当前 index
            while mark_v < config['num_classes']:
                for i in range(config['num_classes']):
                    if chlist[i] >= dataset_size[i]:
                        mark_v += 1
                        continue

                    if domain_bottom[i] <= chlist[i] < domain_top[i]:
                        chlist[i] = domain_top[i]
                        continue

                    filename = os.listdir(os.path.join(config['data_path'], str(i)))[chlist[i]]
                    yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i
                    chlist[i] += 1

        def test_reader():
            mark_v = 0  # 记录数据耗尽的类别数
            chlist = [domain_bottom[i] for i in range(config['num_classes'])]  # 记录每个类别的当前 index
            while mark_v < config['num_classes']:
                for i in range(config['num_classes']):
                    if chlist[i] >= domain_top[i]:
                        mark_v += 1
                        continue

                    filename = os.listdir(os.path.join(config['data_path'], str(i)))[chlist[i]]
                    yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i
                    chlist[i] += 1

        yield train_reader, test_reader
        for i in range(config['num_classes']):
            domain_bottom[i] = domain_top[i]
            domain_top[i] += (dataset_size[i] // k + 1)
            if term == (k - 2) and domain_top[i] > dataset_size[i]:
                domain_top[i] = dataset_size[i]
 

网络定义

以下代码定义神经网络,为了更方便的修改函数,将 PredictProgram 实现为可调用类,网络定义代码也变得更加简洁。
使用时调用 PredictProgram 即可获得网络结构。

代码定义了两种网络结构:

  1. ANN:双隐层全连接网络 + Dropout
  2. CNN:两个卷积层 + Dropout + 全连接 + bn

两种网络结构均使用 fc(10, softmax) 作为输出层,对 10 种手势进行分类。

In[6]
# 网络定义与配置

class PredictProgram:

    def __init__(self):
        self.predict = None

    def flatten(self):
        self.predict = layers.flatten(self.predict)

    def fc(self, size, act=None):
        self.predict = layers.fc(input=self.predict, size=size, act=act)

    def cp(self, filter_num, filter_size, conv_stride, pool_size, pool_stride, act='relu'):
        self.predict = fluid.nets.simple_img_conv_pool(
            input=self.predict,
            num_filters=filter_num,
            filter_size=filter_size,
            conv_stride=conv_stride,
            pool_size=pool_size,
            pool_stride=pool_stride,
            act=act
        )

    def drop(self, rate):
        self.predict = layers.dropout(self.predict, rate)

    def batch_normalization(self):
        self.predict = layers.batch_norm(self.predict)

    def __call__(self):
        self.predict = layers.data(name='x', shape=[3, 64, 64], dtype='float32')

        # net definition
        # ANN
        self.fc(512, 'relu')
        self.drop(0.3)
        self.fc(256, 'relu')
        self.drop(0.2)
        self.fc(config['num_classes'], 'softmax')

        # CNN
        # self.cp(64, 4, 2, 2, 2)
        # self.cp(64, 4, 1, 2, 2)
        # self.drop(0.3)
        # self.flatten()
        # self.fc(256, 'relu')
        # self.batch_normalization()
        # self.fc(config['num_classes'], 'softmax')

        return self.predict



def train_func():
    program = PredictProgram()
    predict = program()
    label = layers.data(name='y', shape=[1], dtype='int64')
    return [layers.mean(layers.cross_entropy(input=predict, label=label)), predict]

def optimizer():
    return fluid.optimizer.Adam()
 

定义 EventHandler

以下代码定义事件处理器,用来接收训练中的事件并处理,如 EndStepEvent,EndEpochEvent 等。
功能包括:打印 log,验证,保存模型参数。

In[7]
# 事件处理代码

class TrainerEventHandler:
    step = 0
    save_place = None
    trainer = None
    test_reader = None
    feed_order = None

    def __init__(self, feed_order, test_reader, trainer):
        """
        to initialize the EventHandler
        :param save_place: the path to save target files, if it's str, save in file
        as specified directory, if it's a Ploter, plot the picture, if it's None,
        print the metrics out in Console.
        :param trainer: the target trainer, in order to test
        """
        assert isinstance(trainer, tnr.Trainer)
        self.step = 0
        self.metrics = []

        self.trainer = trainer
        self.test_reader = test_reader
        self.feed_order = feed_order

    def __call__(self, event):
        if isinstance(event, tnr.EndEpochEvent):
            # test
            metrics = self.trainer.test(
                reader=self.test_reader,
                feed_order=self.feed_order
            )
            logger.info("In step: %d, test loss: %f", self.step, metrics[0])

            self.trainer.save_params(config['model_save_dir'])
            logger.info("Epoch %d end.", event.epoch)

        if isinstance(event, tnr.EndStepEvent):
            if self.step > 0 and self.step % config['log_steps'] == 0:
                logger.info("In step: %d, train loss: %f", self.step, np.mean(self.metrics))
                self.metrics = []
            else:
                self.metrics.append(event.metrics[0])
            self.step += 1
 

训练代码

以下代码使用 trainer 进行训练,根据数据集划分方式,定义两个方法:random_test 和 k_fold_test,对两种数据集划分方式进行训练。

In[8]
# 训练代码

# 检查预训练参数是否合法
if config['param_path'] and not os.path.isdir(config['param_path']):
    logger.info('config.param_path not exist, setting to None')
    config['param_path'] = None

place = fluid.CUDAPlace(0) if config['use_gpu'] else fluid.CPUPlace()
trainer = tnr.Trainer(
    train_func=train_func,
    place=place,
    param_path=config['param_path'],
    optimizer_func=optimizer
)


def random_test(trainer, rate):
    """
    To conduct a test with the given rate as the portion of the test dataset.
    This test will use the given trainer
    :param trainer: the trainer use to execute the test
    :param rate: The portion of test dataset in the whole dataset.
    """
    assert isinstance(trainer, tnr.Trainer)
    raw_train_reader, raw_test_reader = create_rate_readers(rate)
    train_reader = paddle.batch(
        paddle.reader.shuffle(raw_train_reader, config['shuffle_size']),
        batch_size=config['batch_size']
    )

    test_reader = paddle.batch(
        paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
        batch_size=config['batch_size']
    )

    trainer.train(
        reader=train_reader,
        num_epochs=config['num_epochs'],
        event_handler=TrainerEventHandler(
            config['feed_order'],
            test_reader,
            trainer
        ),
        feed_order=config['feed_order']
    )

    logger.info('Train end.')


def k_fold_test(trainer, k):
    """
    To conduct a time of k-fold test.
    This will run k times.
    :param trainer: the trainer to conduct the test
    :param k: the time of folding
    """
    assert isinstance(trainer, tnr.Trainer)
    k_reader_creator = create_k_fold_readers(k)

    count = 0

    for raw_train_reader, raw_test_reader in k_reader_creator:
        logger.info("------------------------------------------------")
        logger.info("Round %d begin:", count)
        count += 1
        logger.info('')

        train_reader = paddle.batch(
            paddle.reader.shuffle(raw_train_reader, config['shuffle_size']),
            batch_size=config['batch_size']
        )

        test_reader = paddle.batch(
            paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
            batch_size=config['batch_size']
        )

        trainer.train(
            reader=train_reader,
            num_epochs=config['num_epochs'],
            event_handler=TrainerEventHandler(
                config['feed_order'],
                test_reader,
                trainer
            ),
            feed_order=config['feed_order']
        )

    logger.info('Train end.')


random_test(trainer, 0.3)
# k_fold_test(trainer, 2)
In[9]
# 模型测试代码

def test():
    """ 模型测试函数 """

    # 生成测试数据读取器
    _, raw_test_reader = create_rate_readers(0.3)
    test_reader = paddle.batch(
        paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
        batch_size=config['batch_size']
    )

    metrics = trainer.test(reader=test_reader, feed_order=config['feed_order'])
    logger.info("Test loss: %s", metrics[0])

test()
2019-08-31 20:37:24,106 - <ipython-input-9-04d2c22354fb>[line:14] - INFO: Test loss: 0.1035391891792026
In[10]
# 模型固化代码
trainer.save_inference_model(config['model_freeze_dir'], ['x', 'y'], [0, 1])
In[34]
# 模型预测代码

def infer():
    """ 模型预测函数 """
    place = fluid.CUDAPlace(0) if config['use_gpu'] else fluid.CPUPlace()
    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())

    # 加载先前固化的模型
    [inference_program, feed_target_names, fetch_list] = fluid.io.load_inference_model(dirname=config['model_freeze_dir'], executor=exe)

    # 生成预测数据读取器
    _, raw_test_reader = create_rate_readers(0.3)
    infer_reader = paddle.batch(
        paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
        batch_size=1
    )

    img, label = next(infer_reader())[0]
    plt.imshow(((img + 1.0) * 127.0).astype('uint8').reshape((64, 64, 3)))
    plt.show()
    logger.info('Label: %d' % label)
    feed = { feed_target_names[0]: img, feed_target_names[1]: np.array(label).reshape(1, 1) }
    result = exe.run(inference_program, fetch_list=fetch_list, feed=feed)
    logger.info('Predict: %d' % np.argmax(result[1]))


infer()
用PaddlePaddle识别手势-LMLPHP
2019-08-31 20:38:31,375 - <ipython-input-34-4e4176630fe3>[line:22] - INFO: Label: 2
2019-08-31 20:38:31,379 - <ipython-input-34-4e4176630fe3>[line:25] - INFO: Predict: 2

使用AI Studio一键上手实践项目吧: https://aistudio.baidu.com/aistudio/projectdetail/127563

下载安装命令

## CPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/cpu paddlepaddle

## GPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/gpu paddlepaddle-gpu

>> 访问 PaddlePaddle 官网,了解更多相关内容

09-04 16:04