手势识别 trainer
本项目用 PaddlePaddle 教程传统方式,通过 fluid 的 trainer 进行手势识别。
手势识别项目中,训练有两种实现方式:trainer 和 exe;数据集也有两种划分方式:k-fold 划分和按比例划分;同时还有两种网络结构:ANN 和 CNN。
本项目默认使用 "rate 0.3" 作为数据集划分方式,使用 trainer 进行训练,ANN 作为神经网络(可以通过简单的修改注释来进行 k-fold 或 CNN 的训练)
另一个项目 EXE_手势识别 使用 exe 进行训练,CNN 作为神经网络。
下载安装命令
## CPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/cpu paddlepaddle
## GPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/gpu paddlepaddle-gpu
注意
本项目代码需要使用GPU环境来运行:
并且检查相关参数设置, 例如use_gpu, fluid.CUDAPlace(0)等处是否设置正确.
# 解压数据集
!cd /home/aistudio/data/data2182 && unzip -qo Dataset.zip
!cd /home/aistudio/data/data2182/Dataset && rm -f */.DS_Store # 删除无关文件
import os
import time
import random
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as layers
try:
import paddle.fluid.contrib.trainer as tnr # for mac
except:
import paddle.fluid as tnr # for AIStudio
np.random.seed(int(time.clock() * 1e6))
# 一些参数
config = {
'data_path': '/home/aistudio/data/data2182/Dataset', # 数据集目录
'num_classes': 10, # 总类别数
'shuffle_size': 100, # 乱序缓冲区大小
'batch_size': 10, # 训练批处理大小
'num_epochs': 30, # 总训练轮数
'log_steps': 50, # 显示 log 的频率
'feed_order': ['x', 'y'], # 数据进入 trainer 的顺序
'param_path': 'model', # 是否继续训练?None: 重新训练;参数缓存路径: 继续训练
'model_save_dir': 'model', # 参数缓存路径
'model_freeze_dir': 'infer_model', # 模型固化路径
'use_gpu': True, # 是否使用 GPU 训练
}
# 统一的 logger 配置
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
logger = None
def init_log_config():
"""
初始化日志相关配置
:return:
"""
global logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_path = os.path.join(os.getcwd(), 'logs')
if not os.path.exists(log_path):
os.makedirs(log_path)
log_name = os.path.join(log_path, 'train.log')
sh = logging.StreamHandler()
fh = logging.FileHandler(log_name, mode='w')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
sh.setFormatter(formatter)
logger.handlers = []
logger.addHandler(sh)
logger.addHandler(fh)
init_log_config()
定义数据读取器
以下代码定义两种数据读取器:rate_readers 和 k_fold_readers。
rate_readers 根据参数 rate 划分训练集和测试集
如 rate=0.3,则训练集占70%,测试集占30%
通过对打乱的数据集索引进行切分实现。
k_fold_readers 根据参数 k 决定训练集测试集划分比例,以及总轮数
如 k=10,则共生成 10 组 reader,每组 reader 中训练集占90%,测试集占10%
通过测试集的平移来覆盖整个数据集,保证每个数据至少在测试集中出现一次。
# 数据集读取代码
def read_an_image(path):
''' 读取一张图片,转为 np.array 并归一化 '''
img = np.vstack(np.array(Image.open(path).resize((64, 64), Image.ANTIALIAS)))\
.reshape(1, 3, 64, 64)\
.astype('float32')
img = img / 255.0 * 2.0 - 1
return img
def get_dataset_size():
'''
读取数据集各类别的数据量
:return 长度为 num_classes 的列表,每一项为一个类别的数据量
'''
sizes = []
for i in range(config['num_classes']):
sizes.append(len(os.listdir(os.path.join(config['data_path'], str(i)))))
return sizes
def create_rate_readers(rate):
"""
根据 rate 创建 train_reader 和 test_reader
:param rate: 测试集所占比例,取值范围 [0.01, 0.5]
:return: train_reader 和 test_reader
"""
if rate > 0.5 or rate < 0.01:
raise ValueError("rate should in [0.01, 0.5]")
dataset_size = get_dataset_size()
train_list = []
test_list = []
for i in range(config['num_classes']):
size = dataset_size[i]
train_size = size - int(size * rate)
data = np.arange(size)
np.random.shuffle(data)
train_list.append(data[:train_size])
test_list.append(data[train_size:])
def train_reader():
mark_v = 0 # 记录数据耗尽的类别数
chlist = [0 for i in range(config['num_classes'])] # 记录每个类别的当前 index
while mark_v < config['num_classes']:
for i in range(config['num_classes']):
if chlist[i] >= len(train_list[i]):
mark_v += 1
continue
idx = train_list[i][chlist[i]]
chlist[i] += 1
filename = os.listdir(os.path.join(config['data_path'], str(i)))[idx]
yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i
def test_reader():
mark_v = 0
chlist = [0 for i in range(config['num_classes'])]
while mark_v < config['num_classes']:
for i in range(config['num_classes']):
if chlist[i] >= len(test_list[i]):
mark_v += 1
continue
idx = test_list[i][chlist[i]]
chlist[i] += 1
filename = os.listdir(os.path.join(config['data_path'], str(i)))[idx]
yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i
return train_reader, test_reader
def create_k_fold_readers(k):
"""
生成 k 个 train_reader 和 test_reader,按照 1/k 的比例划分,k 组 reader 使得测试集覆盖整个数据集
:param k: 划分数据集的比例以及生成 reader 的数量
:return: k_fold_reader_creator, 调用 k 次以获取所有数据
"""
assert isinstance(k, int)
if k > 100 or k < 1:
raise ValueError("k should be an Integer in [1, 100]")
dataset_size = get_dataset_size()
domain_bottom = []
domain_top = []
for i in range(config['num_classes']):
domain_bottom.append(0)
domain_top.append(dataset_size[i] // k + 1)
for term in range(k):
def train_reader():
mark_v = 0 # 记录数据耗尽的类别数
chlist = [0 for i in range(config['num_classes'])] # 记录每个类别的当前 index
while mark_v < config['num_classes']:
for i in range(config['num_classes']):
if chlist[i] >= dataset_size[i]:
mark_v += 1
continue
if domain_bottom[i] <= chlist[i] < domain_top[i]:
chlist[i] = domain_top[i]
continue
filename = os.listdir(os.path.join(config['data_path'], str(i)))[chlist[i]]
yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i
chlist[i] += 1
def test_reader():
mark_v = 0 # 记录数据耗尽的类别数
chlist = [domain_bottom[i] for i in range(config['num_classes'])] # 记录每个类别的当前 index
while mark_v < config['num_classes']:
for i in range(config['num_classes']):
if chlist[i] >= domain_top[i]:
mark_v += 1
continue
filename = os.listdir(os.path.join(config['data_path'], str(i)))[chlist[i]]
yield read_an_image(os.path.join(os.path.join(config['data_path'], str(i)), filename)), i
chlist[i] += 1
yield train_reader, test_reader
for i in range(config['num_classes']):
domain_bottom[i] = domain_top[i]
domain_top[i] += (dataset_size[i] // k + 1)
if term == (k - 2) and domain_top[i] > dataset_size[i]:
domain_top[i] = dataset_size[i]
网络定义
以下代码定义神经网络,为了更方便的修改函数,将 PredictProgram 实现为可调用类,网络定义代码也变得更加简洁。
使用时调用 PredictProgram 即可获得网络结构。
代码定义了两种网络结构:
- ANN:双隐层全连接网络 + Dropout
- CNN:两个卷积层 + Dropout + 全连接 + bn
两种网络结构均使用 fc(10, softmax) 作为输出层,对 10 种手势进行分类。
# 网络定义与配置
class PredictProgram:
def __init__(self):
self.predict = None
def flatten(self):
self.predict = layers.flatten(self.predict)
def fc(self, size, act=None):
self.predict = layers.fc(input=self.predict, size=size, act=act)
def cp(self, filter_num, filter_size, conv_stride, pool_size, pool_stride, act='relu'):
self.predict = fluid.nets.simple_img_conv_pool(
input=self.predict,
num_filters=filter_num,
filter_size=filter_size,
conv_stride=conv_stride,
pool_size=pool_size,
pool_stride=pool_stride,
act=act
)
def drop(self, rate):
self.predict = layers.dropout(self.predict, rate)
def batch_normalization(self):
self.predict = layers.batch_norm(self.predict)
def __call__(self):
self.predict = layers.data(name='x', shape=[3, 64, 64], dtype='float32')
# net definition
# ANN
self.fc(512, 'relu')
self.drop(0.3)
self.fc(256, 'relu')
self.drop(0.2)
self.fc(config['num_classes'], 'softmax')
# CNN
# self.cp(64, 4, 2, 2, 2)
# self.cp(64, 4, 1, 2, 2)
# self.drop(0.3)
# self.flatten()
# self.fc(256, 'relu')
# self.batch_normalization()
# self.fc(config['num_classes'], 'softmax')
return self.predict
def train_func():
program = PredictProgram()
predict = program()
label = layers.data(name='y', shape=[1], dtype='int64')
return [layers.mean(layers.cross_entropy(input=predict, label=label)), predict]
def optimizer():
return fluid.optimizer.Adam()
定义 EventHandler
以下代码定义事件处理器,用来接收训练中的事件并处理,如 EndStepEvent,EndEpochEvent 等。
功能包括:打印 log,验证,保存模型参数。
# 事件处理代码
class TrainerEventHandler:
step = 0
save_place = None
trainer = None
test_reader = None
feed_order = None
def __init__(self, feed_order, test_reader, trainer):
"""
to initialize the EventHandler
:param save_place: the path to save target files, if it's str, save in file
as specified directory, if it's a Ploter, plot the picture, if it's None,
print the metrics out in Console.
:param trainer: the target trainer, in order to test
"""
assert isinstance(trainer, tnr.Trainer)
self.step = 0
self.metrics = []
self.trainer = trainer
self.test_reader = test_reader
self.feed_order = feed_order
def __call__(self, event):
if isinstance(event, tnr.EndEpochEvent):
# test
metrics = self.trainer.test(
reader=self.test_reader,
feed_order=self.feed_order
)
logger.info("In step: %d, test loss: %f", self.step, metrics[0])
self.trainer.save_params(config['model_save_dir'])
logger.info("Epoch %d end.", event.epoch)
if isinstance(event, tnr.EndStepEvent):
if self.step > 0 and self.step % config['log_steps'] == 0:
logger.info("In step: %d, train loss: %f", self.step, np.mean(self.metrics))
self.metrics = []
else:
self.metrics.append(event.metrics[0])
self.step += 1
训练代码
以下代码使用 trainer 进行训练,根据数据集划分方式,定义两个方法:random_test 和 k_fold_test,对两种数据集划分方式进行训练。
# 训练代码
# 检查预训练参数是否合法
if config['param_path'] and not os.path.isdir(config['param_path']):
logger.info('config.param_path not exist, setting to None')
config['param_path'] = None
place = fluid.CUDAPlace(0) if config['use_gpu'] else fluid.CPUPlace()
trainer = tnr.Trainer(
train_func=train_func,
place=place,
param_path=config['param_path'],
optimizer_func=optimizer
)
def random_test(trainer, rate):
"""
To conduct a test with the given rate as the portion of the test dataset.
This test will use the given trainer
:param trainer: the trainer use to execute the test
:param rate: The portion of test dataset in the whole dataset.
"""
assert isinstance(trainer, tnr.Trainer)
raw_train_reader, raw_test_reader = create_rate_readers(rate)
train_reader = paddle.batch(
paddle.reader.shuffle(raw_train_reader, config['shuffle_size']),
batch_size=config['batch_size']
)
test_reader = paddle.batch(
paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
batch_size=config['batch_size']
)
trainer.train(
reader=train_reader,
num_epochs=config['num_epochs'],
event_handler=TrainerEventHandler(
config['feed_order'],
test_reader,
trainer
),
feed_order=config['feed_order']
)
logger.info('Train end.')
def k_fold_test(trainer, k):
"""
To conduct a time of k-fold test.
This will run k times.
:param trainer: the trainer to conduct the test
:param k: the time of folding
"""
assert isinstance(trainer, tnr.Trainer)
k_reader_creator = create_k_fold_readers(k)
count = 0
for raw_train_reader, raw_test_reader in k_reader_creator:
logger.info("------------------------------------------------")
logger.info("Round %d begin:", count)
count += 1
logger.info('')
train_reader = paddle.batch(
paddle.reader.shuffle(raw_train_reader, config['shuffle_size']),
batch_size=config['batch_size']
)
test_reader = paddle.batch(
paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
batch_size=config['batch_size']
)
trainer.train(
reader=train_reader,
num_epochs=config['num_epochs'],
event_handler=TrainerEventHandler(
config['feed_order'],
test_reader,
trainer
),
feed_order=config['feed_order']
)
logger.info('Train end.')
random_test(trainer, 0.3)
# k_fold_test(trainer, 2)
# 模型测试代码
def test():
""" 模型测试函数 """
# 生成测试数据读取器
_, raw_test_reader = create_rate_readers(0.3)
test_reader = paddle.batch(
paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
batch_size=config['batch_size']
)
metrics = trainer.test(reader=test_reader, feed_order=config['feed_order'])
logger.info("Test loss: %s", metrics[0])
test()
2019-08-31 20:37:24,106 - <ipython-input-9-04d2c22354fb>[line:14] - INFO: Test loss: 0.1035391891792026
# 模型固化代码
trainer.save_inference_model(config['model_freeze_dir'], ['x', 'y'], [0, 1])
# 模型预测代码
def infer():
""" 模型预测函数 """
place = fluid.CUDAPlace(0) if config['use_gpu'] else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
# 加载先前固化的模型
[inference_program, feed_target_names, fetch_list] = fluid.io.load_inference_model(dirname=config['model_freeze_dir'], executor=exe)
# 生成预测数据读取器
_, raw_test_reader = create_rate_readers(0.3)
infer_reader = paddle.batch(
paddle.reader.shuffle(raw_test_reader, config['shuffle_size']),
batch_size=1
)
img, label = next(infer_reader())[0]
plt.imshow(((img + 1.0) * 127.0).astype('uint8').reshape((64, 64, 3)))
plt.show()
logger.info('Label: %d' % label)
feed = { feed_target_names[0]: img, feed_target_names[1]: np.array(label).reshape(1, 1) }
result = exe.run(inference_program, fetch_list=fetch_list, feed=feed)
logger.info('Predict: %d' % np.argmax(result[1]))
infer()
2019-08-31 20:38:31,375 - <ipython-input-34-4e4176630fe3>[line:22] - INFO: Label: 2
2019-08-31 20:38:31,379 - <ipython-input-34-4e4176630fe3>[line:25] - INFO: Predict: 2
使用AI Studio一键上手实践项目吧: https://aistudio.baidu.com/aistudio/projectdetail/127563
下载安装命令
## CPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/cpu paddlepaddle
## GPU版本安装命令
pip install -f https://paddlepaddle.org.cn/pip/oschina/gpu paddlepaddle-gpu
>> 访问 PaddlePaddle 官网,了解更多相关内容。