缘起:
曾经尝试自己实现Mask RCNN,但由于原文文献过多及个人经验有限等原因,实现
的版本有问题。故找了一个开源的Mask RCNN的代码誊抄模型构建部分相关代码加强理解,故本文站在纯实现的观点尝试叙述Mask RCNN的结构。
实现工程地址:https://github.com/matterport/Mask_RCNN
整个过程围绕工程中 mrcnn模块 model.py进行,顺序为MaskRCNN类的build函数的构建顺序。(先逆序思路概括,再顺序细节提及)
首先简单要明确问题,根据明确的问题看一下具体的结构思路。(可以看作build函数在思路上的概括(逆序),每一段句号后最后一个括号(如果有的话)是对应build函数中引用的相应函数名或变量名)
问题是对于给定的图片得到内部instance的bbox及相应bbox内部的mask(掩码),前者对应边框参数回归(smooth_l1_loss),后者对应pixel level binary_crossentropy loss。这两个loss直接对应实现单个类别检测最终任务需要完成的目的。(bbox_loss
、mask_loss)
从多类别的角度还要加入一个分类的loss,用于判别所属的具体类别。(class_loss)
从模型整体结构的角度还要加入两个loss,这就要涉及具体结构,这里简单提一下,从最终的bbox loss及mask loss(分别对应源码中的bbox_loss及mask_loss)的特征提取角度其分别要对应一个框住待检测特征的特征图(其边界信息对应rois,特征图对应feature_maps),对于这些特征图的要求是要尽可能地仅仅保留待检测区域的图像特征,因为这样提取的local信息更适合参与判定所属类别及长宽、mask。
所以还要锁定feature_maps及rois的构建。其中前者一般就是一个特征提取,在说明中不是重点,简单概括就是fpn模型结构中不同stage所输出的特征图。(采用fpn这种金字塔网络结构的原因是为了解决类似单层顺序结构提取小物体特征时的特征提取不充分问题。)这里的重点是rois的构建,其决定了对于feature_maps的“裁剪”,裁剪之后的特征大小不一,为了统一进入前向神经网络,需要使用ROI Pooling进行对齐,其具体操作中涉及PyramidROIAlign中使用h、w从feature_maps中选取特定feature_map进行特征提取的部分,暂时不展开(PyramidROIAlign)。(fpn_classifier_graph、build_fpn_mask_graph)
所以现在锁定rois的产生过程。rois(region of interests)对应边界(y1、x1、y2、x2),
rois是感兴趣的的部分,其产生可以看作为对于proposals关于与ground truth bbox iou值的筛选,筛选的是iou较高及较低的两部分,并对于iou值高的那部分(positive)给与构造后续loss的变量描述(roi_gt_class_ids, deltas, masks
),对于iou较低的那部分(nagative)对应描述会被padding掉,不参与loss构建。(这里及之后略去对于coco数据集中关于crowd数据的讨论)。(detection_targets_graph)
锁定proposals的产生过程。rois仅仅是选取了iou较大较小的部分proposals,proposals的产生基本上是基于anchors产生的。首先anchors可以看成由比例参数(config.RPN_ANCHOR_RATIOS)、宽度(config.RPN_ANCHOR_SCALES)对于原图每一个像素位置的定长box(constant),对于这部分常数特征要进行筛选及微调两部分操作,筛选对应计算background score、foreground score选取前景得分较高的部分及nms过程;对于anchors的“微调”对应apply_box_deltas_graph。(对于anchors的产生可参见sample/coco/inspect_data.ipynb中的内容,MaskRCNN类的get_anchors直接调用其中方法)(ProposalLayer)
锁定rpn_class, rpn_bbox, anchors这三个用于产生(call) ProposalLayer的变量,最后一个调用get_anchors不述,rpn_class, rpn_bbox(得分与微调)是对于feature_maps中的不同feature_map调用build_rpn_model并按h * w * anchors_per_location维度进行concat得到的,即对于不同的stage都进行考虑(这样从输出量上来避免可能“漏掉”的instance或bbox,其过滤交给后续网络)与PyramidROIAlign的按照h、w筛选相对。(build_rpn_model)
现在讨论到了根部feature_maps的构造,这部分将在后面顺序描述中展开。
上面提到“从模型整体结构的角度还要加入两个loss”,分别对应rpn_class_loss(决定background score、foreground score的loss)及rpn_bbox_loss(用于规范化“微调”特征rpn_bbox的smooth_l1_loss)。(rpn_class_loss,rpn_bbox_loss)
这部分的结构概括信息有一些多,可以结合代码来看,另一方面推荐一下bilibili 鱿鱼哥 做的关于Mask RCNN结构的解读视频,
https://www.bilibili.com/video/av24795835/?p=2
上面视频链接关于结构的网络图做的比较清晰,可以结合起来看。
其次明确输入的是什么数据。为了明晰build函数的数据输入格式,对data_generator函数产生的数据进行简要概括,重要参考sample/coco/inspect_data.ipynb中的内容。会涉及一些数据处理。
先提一些data_generator及anchors比较零碎的细节:
对于图像问题的增强,一般可以使用imgaug对于图像直接进行,对应标签的同步增强,对于pixel级别的是相对简单的(将对于图片的随机增强固化augmentation.to_deterministic(),之后施加即可),故对于bbox的增强后对应label采取了通过变换后mask使用np.any得到(utils.extract_bboxes)。
对于每一个图片样本,有对应的image_meta进行图片的元数据描述,生成该元数据的过程伴随着图片的基本数据处理,如resize(utils.resize_image utils.resize_mask),前者生成的window(一般使用square方法进行resize,以保证形状,window描述变形后padding边界)作用于refine_detections_graph(DetectionLayer 作用于inference阶段)中对于rois进行裁剪(clip_boxes_graph)以得到估计值。
modellib.load_image_gt函数use_mini_mask参数用以设定生成56x56的mask,这种考虑对于大量为0的mask原来size会消耗存储性能,只不过use_mini_mask对应mask相对模糊。
get_anchors中anchors的生成过程为先计算特征金字塔的骨架(modellib.compute_backbone_shapes),再调用utils.generate_pyramid_anchors生成不同比例及宽度直积的结果,config.RPN_ANCHOR_STRIDE定义对于骨架pixel“维度”的stride。
data_generator有如下生成内容,inputs: (images, image_data, rpn_match,rpn_bbox,gt_class_ids,gt_boxes,gt_masks),其中rpn_match,rpn_bbox为使用anchors于ground truth bbox特征筛选到的rpn target (build_rpn_targets),用于rpn_class_loss,rpn_bbox_loss的计算,其它生成内容的意义是明显的。
这样就对于模型结构及数据输入有了一个大体的认识,下面从顺序的角度(build 函数的定义顺序) 来看看细节。
先整个贴一遍build函数结构:
def build(self, mode, config):
"""Build Mask R-CNN architecture.
input_shape: The shape of the input image.
mode: Either "training" or "inference". The inputs and
outputs of the model differ accordingly.
"""
assert mode in ['training', 'inference']
# Image size must be dividable by 2 multiple times
h, w = config.IMAGE_SHAPE[:2]
if h / 2**6 != int(h / 2**6) or w / 2**6 != int(w / 2**6):
raise Exception("Image size must be dividable by 2 at least 6 times "
"to avoid fractions when downscaling and upscaling."
"For example, use 256, 320, 384, 448, 512, ... etc. ")
# Inputs
input_image = KL.Input(
shape=[None, None, config.IMAGE_SHAPE[2]], name="input_image")
input_image_meta = KL.Input(shape=[config.IMAGE_META_SIZE],
name="input_image_meta")
if mode == "training":
# RPN GT
input_rpn_match = KL.Input(
shape=[None, 1], name="input_rpn_match", dtype=tf.int32)
input_rpn_bbox = KL.Input(
shape=[None, 4], name="input_rpn_bbox", dtype=tf.float32)
# Detection GT (class IDs, bounding boxes, and masks)
# 1. GT Class IDs (zero padded)
input_gt_class_ids = KL.Input(
shape=[None], name="input_gt_class_ids", dtype=tf.int32)
# 2. GT Boxes in pixels (zero padded)
# [batch, MAX_GT_INSTANCES, (y1, x1, y2, x2)] in image coordinates
input_gt_boxes = KL.Input(
shape=[None, 4], name="input_gt_boxes", dtype=tf.float32)
# Normalize coordinates
gt_boxes = KL.Lambda(lambda x: norm_boxes_graph(
x, K.shape(input_image)[1:3]))(input_gt_boxes)
# 3. GT Masks (zero padded)
# [batch, height, width, MAX_GT_INSTANCES]
if config.USE_MINI_MASK:
input_gt_masks = KL.Input(
shape=[config.MINI_MASK_SHAPE[0],
config.MINI_MASK_SHAPE[1], None],
name="input_gt_masks", dtype=bool)
else:
input_gt_masks = KL.Input(
shape=[config.IMAGE_SHAPE[0], config.IMAGE_SHAPE[1], None],
name="input_gt_masks", dtype=bool)
elif mode == "inference":
# Anchors in normalized coordinates
input_anchors = KL.Input(shape=[None, 4], name="input_anchors")
# Build the shared convolutional layers.
# Bottom-up Layers
# Returns a list of the last layers of each stage, 5 in total.
# Don't create the thead (stage 5), so we pick the 4th item in the list.
if callable(config.BACKBONE):
_, C2, C3, C4, C5 = config.BACKBONE(input_image, stage5=True,
train_bn=config.TRAIN_BN)
else:
_, C2, C3, C4, C5 = resnet_graph(input_image, config.BACKBONE,
stage5=True, train_bn=config.TRAIN_BN)
# Top-down Layers
# TODO: add assert to varify feature map sizes match what's in config
P5 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c5p5')(C5)
P4 = KL.Add(name="fpn_p4add")([
KL.UpSampling2D(size=(2, 2), name="fpn_p5upsampled")(P5),
KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c4p4')(C4)])
P3 = KL.Add(name="fpn_p3add")([
KL.UpSampling2D(size=(2, 2), name="fpn_p4upsampled")(P4),
KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c3p3')(C3)])
P2 = KL.Add(name="fpn_p2add")([
KL.UpSampling2D(size=(2, 2), name="fpn_p3upsampled")(P3),
KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c2p2')(C2)])
# Attach 3x3 conv to all P layers to get the final feature maps.
P2 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p2")(P2)
P3 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p3")(P3)
P4 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p4")(P4)
P5 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p5")(P5)
# P6 is used for the 5th anchor scale in RPN. Generated by
# subsampling from P5 with stride of 2.
P6 = KL.MaxPooling2D(pool_size=(1, 1), strides=2, name="fpn_p6")(P5)
# Note that P6 is used in RPN, but not in the classifier heads.
rpn_feature_maps = [P2, P3, P4, P5, P6]
mrcnn_feature_maps = [P2, P3, P4, P5]
# Anchors
if mode == "training":
anchors = self.get_anchors(config.IMAGE_SHAPE)
# Duplicate across the batch dimension because Keras requires it
# TODO: can this be optimized to avoid duplicating the anchors?
anchors = np.broadcast_to(anchors, (config.BATCH_SIZE,) + anchors.shape)
# A hack to get around Keras's bad support for constants
anchors = KL.Lambda(lambda x: tf.Variable(anchors), name="anchors")(input_image)
else:
anchors = input_anchors
# RPN Model
rpn = build_rpn_model(config.RPN_ANCHOR_STRIDE,
len(config.RPN_ANCHOR_RATIOS), config.TOP_DOWN_PYRAMID_SIZE)
# Loop through pyramid layers
layer_outputs = [] # list of lists
for p in rpn_feature_maps:
layer_outputs.append(rpn([p]))
# Concatenate layer outputs
# Convert from list of lists of level outputs to list of lists
# of outputs across levels.
# e.g. [[a1, b1, c1], [a2, b2, c2]] => [[a1, a2], [b1, b2], [c1, c2]]
output_names = ["rpn_class_logits", "rpn_class", "rpn_bbox"]
outputs = list(zip(*layer_outputs))
outputs = [KL.Concatenate(axis=1, name=n)(list(o))
for o, n in zip(outputs, output_names)]
rpn_class_logits, rpn_class, rpn_bbox = outputs
# Generate proposals
# Proposals are [batch, N, (y1, x1, y2, x2)] in normalized coordinates
# and zero padded.
proposal_count = config.POST_NMS_ROIS_TRAINING if mode == "training"\
else config.POST_NMS_ROIS_INFERENCE
rpn_rois = ProposalLayer(
proposal_count=proposal_count,
nms_threshold=config.RPN_NMS_THRESHOLD,
name="ROI",
config=config)([rpn_class, rpn_bbox, anchors])
if mode == "training":
# Class ID mask to mark class IDs supported by the dataset the image
# came from.
active_class_ids = KL.Lambda(
lambda x: parse_image_meta_graph(x)["active_class_ids"]
)(input_image_meta)
if not config.USE_RPN_ROIS:
# Ignore predicted ROIs and use ROIs provided as an input.
input_rois = KL.Input(shape=[config.POST_NMS_ROIS_TRAINING, 4],
name="input_roi", dtype=np.int32)
# Normalize coordinates
target_rois = KL.Lambda(lambda x: norm_boxes_graph(
x, K.shape(input_image)[1:3]))(input_rois)
else:
target_rois = rpn_rois
# Generate detection targets
# Subsamples proposals and generates target outputs for training
# Note that proposal class IDs, gt_boxes, and gt_masks are zero
# padded. Equally, returned rois and targets are zero padded.
rois, target_class_ids, target_bbox, target_mask =\
DetectionTargetLayer(config, name="proposal_targets")([
target_rois, input_gt_class_ids, gt_boxes, input_gt_masks])
# Network Heads
# TODO: verify that this handles zero padded ROIs
mrcnn_class_logits, mrcnn_class, mrcnn_bbox =\
fpn_classifier_graph(rois, mrcnn_feature_maps, input_image_meta,
config.POOL_SIZE, config.NUM_CLASSES,
train_bn=config.TRAIN_BN,
fc_layers_size=config.FPN_CLASSIF_FC_LAYERS_SIZE)
mrcnn_mask = build_fpn_mask_graph(rois, mrcnn_feature_maps,
input_image_meta,
config.MASK_POOL_SIZE,
config.NUM_CLASSES,
train_bn=config.TRAIN_BN)
# TODO: clean up (use tf.identify if necessary)
output_rois = KL.Lambda(lambda x: x * 1, name="output_rois")(rois)
# Losses
rpn_class_loss = KL.Lambda(lambda x: rpn_class_loss_graph(*x), name="rpn_class_loss")(
[input_rpn_match, rpn_class_logits])
rpn_bbox_loss = KL.Lambda(lambda x: rpn_bbox_loss_graph(config, *x), name="rpn_bbox_loss")(
[input_rpn_bbox, input_rpn_match, rpn_bbox])
class_loss = KL.Lambda(lambda x: mrcnn_class_loss_graph(*x), name="mrcnn_class_loss")(
[target_class_ids, mrcnn_class_logits, active_class_ids])
bbox_loss = KL.Lambda(lambda x: mrcnn_bbox_loss_graph(*x), name="mrcnn_bbox_loss")(
[target_bbox, target_class_ids, mrcnn_bbox])
mask_loss = KL.Lambda(lambda x: mrcnn_mask_loss_graph(*x), name="mrcnn_mask_loss")(
[target_mask, target_class_ids, mrcnn_mask])
# Model
inputs = [input_image, input_image_meta,
input_rpn_match, input_rpn_bbox, input_gt_class_ids, input_gt_boxes, input_gt_masks]
if not config.USE_RPN_ROIS:
inputs.append(input_rois)
outputs = [rpn_class_logits, rpn_class, rpn_bbox,
mrcnn_class_logits, mrcnn_class, mrcnn_bbox, mrcnn_mask,
rpn_rois, output_rois,
rpn_class_loss, rpn_bbox_loss, class_loss, bbox_loss, mask_loss]
model = KM.Model(inputs, outputs, name='mask_rcnn')
else:
# Network Heads
# Proposal classifier and BBox regressor heads
mrcnn_class_logits, mrcnn_class, mrcnn_bbox =\
fpn_classifier_graph(rpn_rois, mrcnn_feature_maps, input_image_meta,
config.POOL_SIZE, config.NUM_CLASSES,
train_bn=config.TRAIN_BN,
fc_layers_size=config.FPN_CLASSIF_FC_LAYERS_SIZE)
# Detections
# output is [batch, num_detections, (y1, x1, y2, x2, class_id, score)] in
# normalized coordinates
detections = DetectionLayer(config, name="mrcnn_detection")(
[rpn_rois, mrcnn_class, mrcnn_bbox, input_image_meta])
# Create masks for detections
detection_boxes = KL.Lambda(lambda x: x[..., :4])(detections)
mrcnn_mask = build_fpn_mask_graph(detection_boxes, mrcnn_feature_maps,
input_image_meta,
config.MASK_POOL_SIZE,
config.NUM_CLASSES,
train_bn=config.TRAIN_BN)
model = KM.Model([input_image, input_image_meta, input_anchors],
[detections, mrcnn_class, mrcnn_bbox,
mrcnn_mask, rpn_rois, rpn_class, rpn_bbox],
name='mask_rcnn')
# Add multi-GPU support.
if config.GPU_COUNT > 1:
from mrcnn.parallel_model import ParallelModel
model = ParallelModel(model, config.GPU_COUNT)
return model
下面开始读。(略过输入部分,按顺序叙述,没有提到的部分直接略过)
生成fpn基础结构
_, C2, C3, C4, C5 = resnet_graph(input_image, config.BACKBONE,
stage5=True, train_bn=config.TRAIN_BN)
fpn的左边的具体实现基本上是堆叠的residual_block,自底而上shape减小,channels增大(从而得到C2, C3, C4, C5 )
,右边的具体实现涉及通过1x1卷积统一通道,顶部特征上采样直接相加特征合并两部分,这样就构成了右边的部分(得到P2, P3, P4, P5, P6),之后再对各个Pi进行3x3卷积特征提取。从而得到rpn_feature_maps及mrcnn_feature_maps,对应代码部分如下:
# Top-down Layers
# TODO: add assert to varify feature map sizes match what's in config
P5 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c5p5')(C5)
P4 = KL.Add(name="fpn_p4add")([
KL.UpSampling2D(size=(2, 2), name="fpn_p5upsampled")(P5),
KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c4p4')(C4)])
P3 = KL.Add(name="fpn_p3add")([
KL.UpSampling2D(size=(2, 2), name="fpn_p4upsampled")(P4),
KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c3p3')(C3)])
P2 = KL.Add(name="fpn_p2add")([
KL.UpSampling2D(size=(2, 2), name="fpn_p3upsampled")(P3),
KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (1, 1), name='fpn_c2p2')(C2)])
# Attach 3x3 conv to all P layers to get the final feature maps.
P2 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p2")(P2)
P3 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p3")(P3)
P4 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p4")(P4)
P5 = KL.Conv2D(config.TOP_DOWN_PYRAMID_SIZE, (3, 3), padding="SAME", name="fpn_p5")(P5)
# P6 is used for the 5th anchor scale in RPN. Generated by
# subsampling from P5 with stride of 2.
P6 = KL.MaxPooling2D(pool_size=(1, 1), strides=2, name="fpn_p6")(P5)
# Note that P6 is used in RPN, but not in the classifier heads.
rpn_feature_maps = [P2, P3, P4, P5, P6]
mrcnn_feature_maps = [P2, P3, P4, P5]
下面的部分涉及使用rpn_feature_map构造rpn_model的部分(build_rpn_model)build_rpn_model调用rpn_graph遍历every rpn_feature_map中的every pixel生成对应位置anchor的(fore back)ground score及bbox的微调。对应如下代码:
def rpn_graph(feature_map, anchors_per_location, anchor_stride):
"""Builds the computation graph of Region Proposal Network.
feature_map: backbone features [batch, height, width, depth]
anchors_per_location: number of anchors per pixel in the feature map
anchor_stride: Controls the density of anchors. Typically 1 (anchors for
every pixel in the feature map), or 2 (every other pixel).
Returns:
rpn_class_logits: [batch, H * W * anchors_per_location, 2] Anchor classifier logits (before softmax)
rpn_probs: [batch, H * W * anchors_per_location, 2] Anchor classifier probabilities.
rpn_bbox: [batch, H * W * anchors_per_location, (dy, dx, log(dh), log(dw))] Deltas to be
applied to anchors.
"""
# TODO: check if stride of 2 causes alignment issues if the feature map
# is not even.
# Shared convolutional base of the RPN
shared = KL.Conv2D(512, (3, 3), padding='same', activation='relu',
strides=anchor_stride,
name='rpn_conv_shared')(feature_map)
# Anchor Score. [batch, height, width, anchors per location * 2].
x = KL.Conv2D(2 * anchors_per_location, (1, 1), padding='valid',
activation='linear', name='rpn_class_raw')(shared)
# Reshape to [batch, anchors, 2]
rpn_class_logits = KL.Lambda(
lambda t: tf.reshape(t, [tf.shape(t)[0], -1, 2]))(x)
# Softmax on last dimension of BG/FG.
rpn_probs = KL.Activation(
"softmax", name="rpn_class_xxx")(rpn_class_logits)
# Bounding box refinement. [batch, H, W, anchors per location * depth]
# where depth is [x, y, log(w), log(h)]
x = KL.Conv2D(anchors_per_location * 4, (1, 1), padding="valid",
activation='linear', name='rpn_bbox_pred')(shared)
# Reshape to [batch, anchors, 4]
rpn_bbox = KL.Lambda(lambda t: tf.reshape(t, [tf.shape(t)[0], -1, 4]))(x)
return [rpn_class_logits, rpn_probs, rpn_bbox]
生成的特征(rpn_class_logits rpn_class rpn_bbox)按feature_map的顺序进行合并(concat)并该顺序与anchors(get_anchors)的顺序一致。对应如下代码:
# Loop through pyramid layers
layer_outputs = [] # list of lists
for p in rpn_feature_maps:
layer_outputs.append(rpn([p]))
# Concatenate layer outputs
# Convert from list of lists of level outputs to list of lists
# of outputs across levels.
# e.g. [[a1, b1, c1], [a2, b2, c2]] => [[a1, a2], [b1, b2], [c1, c2]]
output_names = ["rpn_class_logits", "rpn_class", "rpn_bbox"]
outputs = list(zip(*layer_outputs))
outputs = [KL.Concatenate(axis=1, name=n)(list(o))
for o, n in zip(outputs, output_names)]
rpn_class_logits, rpn_class, rpn_bbox = outputs
ProposalLayer施加微调并依据前景得分及nms进行过滤,的对应如下部分:
def call(self, inputs):
# Box Scores. Use the foreground class confidence. [Batch, num_rois, 1]
scores = inputs[0][:, :, 1]
# Box deltas [batch, num_rois, 4]
deltas = inputs[1]
deltas = deltas * np.reshape(self.config.RPN_BBOX_STD_DEV, [1, 1, 4])
# Anchors
anchors = inputs[2]
# Improve performance by trimming to top anchors by score
# and doing the rest on the smaller subset.
pre_nms_limit = tf.minimum(self.config.PRE_NMS_LIMIT, tf.shape(anchors)[1])
ix = tf.nn.top_k(scores, pre_nms_limit, sorted=True,
name="top_anchors").indices
scores = utils.batch_slice([scores, ix], lambda x, y: tf.gather(x, y),
self.config.IMAGES_PER_GPU)
deltas = utils.batch_slice([deltas, ix], lambda x, y: tf.gather(x, y),
self.config.IMAGES_PER_GPU)
pre_nms_anchors = utils.batch_slice([anchors, ix], lambda a, x: tf.gather(a, x),
self.config.IMAGES_PER_GPU,
names=["pre_nms_anchors"])
# Apply deltas to anchors to get refined anchors.
# [batch, N, (y1, x1, y2, x2)]
boxes = utils.batch_slice([pre_nms_anchors, deltas],
lambda x, y: apply_box_deltas_graph(x, y),
self.config.IMAGES_PER_GPU,
names=["refined_anchors"])
# Clip to image boundaries. Since we're in normalized coordinates,
# clip to 0..1 range. [batch, N, (y1, x1, y2, x2)]
window = np.array([0, 0, 1, 1], dtype=np.float32)
boxes = utils.batch_slice(boxes,
lambda x: clip_boxes_graph(x, window),
self.config.IMAGES_PER_GPU,
names=["refined_anchors_clipped"])
# Filter out small boxes
# According to Xinlei Chen's paper, this reduces detection accuracy
# for small objects, so we're skipping it.
# Non-max suppression
def nms(boxes, scores):
indices = tf.image.non_max_suppression(
boxes, scores, self.proposal_count,
self.nms_threshold, name="rpn_non_max_suppression")
proposals = tf.gather(boxes, indices)
# Pad if needed
padding = tf.maximum(self.proposal_count - tf.shape(proposals)[0], 0)
proposals = tf.pad(proposals, [(0, padding), (0, 0)])
return proposals
proposals = utils.batch_slice([boxes, scores], nms,
self.config.IMAGES_PER_GPU)
return proposals
以上特征准备对于训练及推断过程是一致的,下面先展开后续训练分支的叙述。
上面得到的微调后proposals,按照与ground truth bbox的iou置信度进行筛选,筛选出positive部分(iou较大)与negative部分(iou较小)的部分,并对二者索引构建rois,对于positive部分索引出相应的roi_gt_class_ids deltas masks(分别对应于参与后三个loss构建的target_class_ids target_delta(target_bbox) target_mask),其中deltas关于ground truth bbox使用utils.box_refinement_graph处理原始proposals可能出界的情况(这部分对应于inference阶段DetectionLayer中refine_detections_graph refined_roi关于window的裁剪),对应如下代码:
def detection_targets_graph(proposals, gt_class_ids, gt_boxes, gt_masks, config):
"""Generates detection targets for one image. Subsamples proposals and
generates target class IDs, bounding box deltas, and masks for each.
Inputs:
proposals: [POST_NMS_ROIS_TRAINING, (y1, x1, y2, x2)] in normalized coordinates. Might
be zero padded if there are not enough proposals.
gt_class_ids: [MAX_GT_INSTANCES] int class IDs
gt_boxes: [MAX_GT_INSTANCES, (y1, x1, y2, x2)] in normalized coordinates.
gt_masks: [height, width, MAX_GT_INSTANCES] of boolean type.
Returns: Target ROIs and corresponding class IDs, bounding box shifts,
and masks.
rois: [TRAIN_ROIS_PER_IMAGE, (y1, x1, y2, x2)] in normalized coordinates
class_ids: [TRAIN_ROIS_PER_IMAGE]. Integer class IDs. Zero padded.
deltas: [TRAIN_ROIS_PER_IMAGE, (dy, dx, log(dh), log(dw))]
masks: [TRAIN_ROIS_PER_IMAGE, height, width]. Masks cropped to bbox
boundaries and resized to neural network output size.
Note: Returned arrays might be zero padded if not enough target ROIs.
"""
# Assertions
asserts = [
tf.Assert(tf.greater(tf.shape(proposals)[0], 0), [proposals],
name="roi_assertion"),
]
with tf.control_dependencies(asserts):
proposals = tf.identity(proposals)
# Remove zero padding
proposals, _ = trim_zeros_graph(proposals, name="trim_proposals")
gt_boxes, non_zeros = trim_zeros_graph(gt_boxes, name="trim_gt_boxes")
gt_class_ids = tf.boolean_mask(gt_class_ids, non_zeros,
name="trim_gt_class_ids")
gt_masks = tf.gather(gt_masks, tf.where(non_zeros)[:, 0], axis=2,
name="trim_gt_masks")
# Handle COCO crowds
# A crowd box in COCO is a bounding box around several instances. Exclude
# them from training. A crowd box is given a negative class ID.
crowd_ix = tf.where(gt_class_ids < 0)[:, 0]
non_crowd_ix = tf.where(gt_class_ids > 0)[:, 0]
crowd_boxes = tf.gather(gt_boxes, crowd_ix)
crowd_masks = tf.gather(gt_masks, crowd_ix, axis=2)
gt_class_ids = tf.gather(gt_class_ids, non_crowd_ix)
gt_boxes = tf.gather(gt_boxes, non_crowd_ix)
gt_masks = tf.gather(gt_masks, non_crowd_ix, axis=2)
# Compute overlaps matrix [proposals, gt_boxes]
overlaps = overlaps_graph(proposals, gt_boxes)
# Compute overlaps with crowd boxes [proposals, crowd_boxes]
crowd_overlaps = overlaps_graph(proposals, crowd_boxes)
crowd_iou_max = tf.reduce_max(crowd_overlaps, axis=1)
no_crowd_bool = (crowd_iou_max < 0.001)
# Determine positive and negative ROIs
roi_iou_max = tf.reduce_max(overlaps, axis=1)
# 1. Positive ROIs are those with >= 0.5 IoU with a GT box
positive_roi_bool = (roi_iou_max >= 0.5)
positive_indices = tf.where(positive_roi_bool)[:, 0]
# 2. Negative ROIs are those with < 0.5 with every GT box. Skip crowds.
negative_indices = tf.where(tf.logical_and(roi_iou_max < 0.5, no_crowd_bool))[:, 0]
# Subsample ROIs. Aim for 33% positive
# Positive ROIs
positive_count = int(config.TRAIN_ROIS_PER_IMAGE *
config.ROI_POSITIVE_RATIO)
positive_indices = tf.random_shuffle(positive_indices)[:positive_count]
positive_count = tf.shape(positive_indices)[0]
# Negative ROIs. Add enough to maintain positive:negative ratio.
r = 1.0 / config.ROI_POSITIVE_RATIO
negative_count = tf.cast(r * tf.cast(positive_count, tf.float32), tf.int32) - positive_count
negative_indices = tf.random_shuffle(negative_indices)[:negative_count]
# Gather selected ROIs
positive_rois = tf.gather(proposals, positive_indices)
negative_rois = tf.gather(proposals, negative_indices)
# Assign positive ROIs to GT boxes.
positive_overlaps = tf.gather(overlaps, positive_indices)
roi_gt_box_assignment = tf.cond(
tf.greater(tf.shape(positive_overlaps)[1], 0),
true_fn = lambda: tf.argmax(positive_overlaps, axis=1),
false_fn = lambda: tf.cast(tf.constant([]),tf.int64)
)
roi_gt_boxes = tf.gather(gt_boxes, roi_gt_box_assignment)
roi_gt_class_ids = tf.gather(gt_class_ids, roi_gt_box_assignment)
# Compute bbox refinement for positive ROIs
deltas = utils.box_refinement_graph(positive_rois, roi_gt_boxes)
deltas /= config.BBOX_STD_DEV
# Assign positive ROIs to GT masks
# Permute masks to [N, height, width, 1]
transposed_masks = tf.expand_dims(tf.transpose(gt_masks, [2, 0, 1]), -1)
# Pick the right mask for each ROI
roi_masks = tf.gather(transposed_masks, roi_gt_box_assignment)
# Compute mask targets
boxes = positive_rois
if config.USE_MINI_MASK:
# Transform ROI coordinates from normalized image space
# to normalized mini-mask space.
y1, x1, y2, x2 = tf.split(positive_rois, 4, axis=1)
gt_y1, gt_x1, gt_y2, gt_x2 = tf.split(roi_gt_boxes, 4, axis=1)
gt_h = gt_y2 - gt_y1
gt_w = gt_x2 - gt_x1
y1 = (y1 - gt_y1) / gt_h
x1 = (x1 - gt_x1) / gt_w
y2 = (y2 - gt_y1) / gt_h
x2 = (x2 - gt_x1) / gt_w
boxes = tf.concat([y1, x1, y2, x2], 1)
box_ids = tf.range(0, tf.shape(roi_masks)[0])
masks = tf.image.crop_and_resize(tf.cast(roi_masks, tf.float32), boxes,
box_ids,
config.MASK_SHAPE)
# Remove the extra dimension from masks.
masks = tf.squeeze(masks, axis=3)
# Threshold mask pixels at 0.5 to have GT masks be 0 or 1 to use with
# binary cross entropy loss.
masks = tf.round(masks)
# Append negative ROIs and pad bbox deltas and masks that
# are not used for negative ROIs with zeros.
rois = tf.concat([positive_rois, negative_rois], axis=0)
N = tf.shape(negative_rois)[0]
P = tf.maximum(config.TRAIN_ROIS_PER_IMAGE - tf.shape(rois)[0], 0)
rois = tf.pad(rois, [(0, P), (0, 0)])
roi_gt_boxes = tf.pad(roi_gt_boxes, [(0, N + P), (0, 0)])
roi_gt_class_ids = tf.pad(roi_gt_class_ids, [(0, N + P)])
deltas = tf.pad(deltas, [(0, N + P), (0, 0)])
masks = tf.pad(masks, [[0, N + P], (0, 0), (0, 0)])
return rois, roi_gt_class_ids, deltas, masks
以上这部分包含了关于rpn_feature_maps的特征提取(rois)及关于mrcnn部分的targets产生(target_class_ids, target_bbox, target_mask)
将mrcnn_feature_maps关于rois进行特征裁剪,并进行对齐Pooling(ROI Pooling),并在对齐特征后进入fc layer就是下面的步骤了。fpn_classifier_graph与build_fpn_mask_graph在这里表现出类似的结构,重点实际是PyramidROIAlign的构造。其主要操作是根据rois的h w从mrcnn_feature_maps中选择相应level的feature_map调用crop_and_resize进行放缩(config.POOL_SIZE)裁剪(rois给出的bbox)。不同level的特征合并按照level相等的的自定义字典序进行。这里基于h w的level选择与fpn的特征提取特性是一致的,即更大的rois域对应更高级别的level特征(特征本身抽象层次更高,对应易于识别大物体,反之亦然)。对应如下代码:
def call(self, inputs):
# Crop boxes [batch, num_boxes, (y1, x1, y2, x2)] in normalized coords
boxes = inputs[0]
# Image meta
# Holds details about the image. See compose_image_meta()
image_meta = inputs[1]
# Feature Maps. List of feature maps from different level of the
# feature pyramid. Each is [batch, height, width, channels]
feature_maps = inputs[2:]
# Assign each ROI to a level in the pyramid based on the ROI area.
y1, x1, y2, x2 = tf.split(boxes, 4, axis=2)
h = y2 - y1
w = x2 - x1
# Use shape of first image. Images in a batch must have the same size.
image_shape = parse_image_meta_graph(image_meta)['image_shape'][0]
# Equation 1 in the Feature Pyramid Networks paper. Account for
# the fact that our coordinates are normalized here.
# e.g. a 224x224 ROI (in pixels) maps to P4
image_area = tf.cast(image_shape[0] * image_shape[1], tf.float32)
roi_level = log2_graph(tf.sqrt(h * w) / (224.0 / tf.sqrt(image_area)))
roi_level = tf.minimum(5, tf.maximum(
2, 4 + tf.cast(tf.round(roi_level), tf.int32)))
roi_level = tf.squeeze(roi_level, 2)
# Loop through levels and apply ROI pooling to each. P2 to P5.
pooled = []
box_to_level = []
for i, level in enumerate(range(2, 6)):
ix = tf.where(tf.equal(roi_level, level))
level_boxes = tf.gather_nd(boxes, ix)
# Box indices for crop_and_resize.
box_indices = tf.cast(ix[:, 0], tf.int32)
# Keep track of which box is mapped to which level
box_to_level.append(ix)
# Stop gradient propogation to ROI proposals
level_boxes = tf.stop_gradient(level_boxes)
box_indices = tf.stop_gradient(box_indices)
# Crop and Resize
# From Mask R-CNN paper: "We sample four regular locations, so
# that we can evaluate either max or average pooling. In fact,
# interpolating only a single value at each bin center (without
# pooling) is nearly as effective."
#
# Here we use the simplified approach of a single value per bin,
# which is how it's done in tf.crop_and_resize()
# Result: [batch * num_boxes, pool_height, pool_width, channels]
pooled.append(tf.image.crop_and_resize(
feature_maps[i], level_boxes, box_indices, self.pool_shape,
method="bilinear"))
# Pack pooled features into one tensor
pooled = tf.concat(pooled, axis=0)
# Pack box_to_level mapping into one array and add another
# column representing the order of pooled boxes
box_to_level = tf.concat(box_to_level, axis=0)
box_range = tf.expand_dims(tf.range(tf.shape(box_to_level)[0]), 1)
box_to_level = tf.concat([tf.cast(box_to_level, tf.int32), box_range],
axis=1)
# Rearrange pooled features to match the order of the original boxes
# Sort box_to_level by batch then box index
# TF doesn't have a way to sort by two columns, so merge them and sort.
sorting_tensor = box_to_level[:, 0] * 100000 + box_to_level[:, 1]
ix = tf.nn.top_k(sorting_tensor, k=tf.shape(
box_to_level)[0]).indices[::-1]
ix = tf.gather(box_to_level[:, 2], ix)
pooled = tf.gather(pooled, ix)
# Re-add the batch dimension
shape = tf.concat([tf.shape(boxes)[:2], tf.shape(pooled)[1:]], axis=0)
pooled = tf.reshape(pooled, shape)
return pooled
有了对齐特征后就可以如下构造分类与回归的prediction。(其中TimeDistributed为对于维度1(第二个维度)的共享权重结构展开,即关于诸rois共享特征)
def fpn_classifier_graph(rois, feature_maps, image_meta,
pool_size, num_classes, train_bn=True,
fc_layers_size=1024):
"""Builds the computation graph of the feature pyramid network classifier
and regressor heads.
rois: [batch, num_rois, (y1, x1, y2, x2)] Proposal boxes in normalized
coordinates.
feature_maps: List of feature maps from different layers of the pyramid,
[P2, P3, P4, P5]. Each has a different resolution.
image_meta: [batch, (meta data)] Image details. See compose_image_meta()
pool_size: The width of the square feature map generated from ROI Pooling.
num_classes: number of classes, which determines the depth of the results
train_bn: Boolean. Train or freeze Batch Norm layers
fc_layers_size: Size of the 2 FC layers
Returns:
logits: [batch, num_rois, NUM_CLASSES] classifier logits (before softmax)
probs: [batch, num_rois, NUM_CLASSES] classifier probabilities
bbox_deltas: [batch, num_rois, NUM_CLASSES, (dy, dx, log(dh), log(dw))] Deltas to apply to
proposal boxes
"""
# ROI Pooling
# Shape: [batch, num_rois, POOL_SIZE, POOL_SIZE, channels]
x = PyramidROIAlign([pool_size, pool_size],
name="roi_align_classifier")([rois, image_meta] + feature_maps)
# Two 1024 FC layers (implemented with Conv2D for consistency)
x = KL.TimeDistributed(KL.Conv2D(fc_layers_size, (pool_size, pool_size), padding="valid"),
name="mrcnn_class_conv1")(x)
x = KL.TimeDistributed(BatchNorm(), name='mrcnn_class_bn1')(x, training=train_bn)
x = KL.Activation('relu')(x)
x = KL.TimeDistributed(KL.Conv2D(fc_layers_size, (1, 1)),
name="mrcnn_class_conv2")(x)
x = KL.TimeDistributed(BatchNorm(), name='mrcnn_class_bn2')(x, training=train_bn)
x = KL.Activation('relu')(x)
shared = KL.Lambda(lambda x: K.squeeze(K.squeeze(x, 3), 2),
name="pool_squeeze")(x)
# Classifier head
mrcnn_class_logits = KL.TimeDistributed(KL.Dense(num_classes),
name='mrcnn_class_logits')(shared)
mrcnn_probs = KL.TimeDistributed(KL.Activation("softmax"),
name="mrcnn_class")(mrcnn_class_logits)
# BBox head
# [batch, num_rois, NUM_CLASSES * (dy, dx, log(dh), log(dw))]
x = KL.TimeDistributed(KL.Dense(num_classes * 4, activation='linear'),
name='mrcnn_bbox_fc')(shared)
# Reshape to [batch, num_rois, NUM_CLASSES, (dy, dx, log(dh), log(dw))]
s = K.int_shape(x)
mrcnn_bbox = KL.Reshape((s[1], num_classes, 4), name="mrcnn_bbox")(x)
return mrcnn_class_logits, mrcnn_probs, mrcnn_bbox
def build_fpn_mask_graph(rois, feature_maps, image_meta,
pool_size, num_classes, train_bn=True):
"""Builds the computation graph of the mask head of Feature Pyramid Network.
rois: [batch, num_rois, (y1, x1, y2, x2)] Proposal boxes in normalized
coordinates.
feature_maps: List of feature maps from different layers of the pyramid,
[P2, P3, P4, P5]. Each has a different resolution.
image_meta: [batch, (meta data)] Image details. See compose_image_meta()
pool_size: The width of the square feature map generated from ROI Pooling.
num_classes: number of classes, which determines the depth of the results
train_bn: Boolean. Train or freeze Batch Norm layers
Returns: Masks [batch, num_rois, MASK_POOL_SIZE, MASK_POOL_SIZE, NUM_CLASSES]
"""
# ROI Pooling
# Shape: [batch, num_rois, MASK_POOL_SIZE, MASK_POOL_SIZE, channels]
x = PyramidROIAlign([pool_size, pool_size],
name="roi_align_mask")([rois, image_meta] + feature_maps)
# Conv layers
x = KL.TimeDistributed(KL.Conv2D(256, (3, 3), padding="same"),
name="mrcnn_mask_conv1")(x)
x = KL.TimeDistributed(BatchNorm(),
name='mrcnn_mask_bn1')(x, training=train_bn)
x = KL.Activation('relu')(x)
x = KL.TimeDistributed(KL.Conv2D(256, (3, 3), padding="same"),
name="mrcnn_mask_conv2")(x)
x = KL.TimeDistributed(BatchNorm(),
name='mrcnn_mask_bn2')(x, training=train_bn)
x = KL.Activation('relu')(x)
x = KL.TimeDistributed(KL.Conv2D(256, (3, 3), padding="same"),
name="mrcnn_mask_conv3")(x)
x = KL.TimeDistributed(BatchNorm(),
name='mrcnn_mask_bn3')(x, training=train_bn)
x = KL.Activation('relu')(x)
x = KL.TimeDistributed(KL.Conv2D(256, (3, 3), padding="same"),
name="mrcnn_mask_conv4")(x)
x = KL.TimeDistributed(BatchNorm(),
name='mrcnn_mask_bn4')(x, training=train_bn)
x = KL.Activation('relu')(x)
x = KL.TimeDistributed(KL.Conv2DTranspose(256, (2, 2), strides=2, activation="relu"),
name="mrcnn_mask_deconv")(x)
x = KL.TimeDistributed(KL.Conv2D(num_classes, (1, 1), strides=1, activation="sigmoid"),
name="mrcnn_mask")(x)
return x
从而得到基于rpn_feature_maps的2个loss及基于mrcnn_feature_maps的3个loss如下:
rpn_class_loss = KL.Lambda(lambda x: rpn_class_loss_graph(*x), name="rpn_class_loss")(
[input_rpn_match, rpn_class_logits])
rpn_bbox_loss = KL.Lambda(lambda x: rpn_bbox_loss_graph(config, *x), name="rpn_bbox_loss")(
[input_rpn_bbox, input_rpn_match, rpn_bbox])
class_loss = KL.Lambda(lambda x: mrcnn_class_loss_graph(*x), name="mrcnn_class_loss")(
[target_class_ids, mrcnn_class_logits, active_class_ids])
bbox_loss = KL.Lambda(lambda x: mrcnn_bbox_loss_graph(*x), name="mrcnn_bbox_loss")(
[target_bbox, target_class_ids, mrcnn_bbox])
mask_loss = KL.Lambda(lambda x: mrcnn_mask_loss_graph(*x), name="mrcnn_mask_loss")(
[target_mask, target_class_ids, mrcnn_mask])
这样就完成了训练阶段的模型构建,下面简要提一下inference阶段。
Inference阶段直接先对proposals(rpn_rois)调用fpn_classifier_graph得到分类概率及bbox的估计,之后利用DetectionLayer中的refine_detections_graph,对于mrcnn_bbox关于image_meta中的window进行裁剪处理出界问题,并根据nms过滤结果。对于过滤裁剪后的bbox估计调用build_fpn_mask_graph生成mrcnn_mask。对应如下:
mrcnn_class_logits, mrcnn_class, mrcnn_bbox =\
fpn_classifier_graph(rpn_rois, mrcnn_feature_maps, input_image_meta,
config.POOL_SIZE, config.NUM_CLASSES,
train_bn=config.TRAIN_BN,
fc_layers_size=config.FPN_CLASSIF_FC_LAYERS_SIZE)
# Detections
# output is [batch, num_detections, (y1, x1, y2, x2, class_id, score)] in
# normalized coordinates
detections = DetectionLayer(config, name="mrcnn_detection")(
[rpn_rois, mrcnn_class, mrcnn_bbox, input_image_meta])
# Create masks for detections
detection_boxes = KL.Lambda(lambda x: x[..., :4])(detections)
mrcnn_mask = build_fpn_mask_graph(detection_boxes, mrcnn_feature_maps,
input_image_meta,
config.MASK_POOL_SIZE,
config.NUM_CLASSES,
train_bn=config.TRAIN_BN)
这大体上是Mask RCNN的模型结构。
本文主要面向纯代码逻辑结构,略枯燥(代码看不下去可以尝试自己抄一遍),下面是一些结合统计意义及对应代码逻辑的好参考文献,推荐阅读。
推荐阅读:https://www.bilibili.com/video/av24795835
https://www.jiqizhixin.com/articles/Mask_RCNN-tree-master-samples-balloon https://javaduqing.github.io/2018/04/13/%E3%80%90%E7%9B%AE%E6%A0%87%E6%A3%80%E6%B5%8B%E3%80%91Mask%20RCNN%E7%AE%97%E6%B3%95%E8%AF%A6%E8%A7%A3/
对于一些小的组成部分可以通过调用测试程序进行调试理解,用单步调试及打印shape的方式理解,下面是用placeholder的一些测试:(放在model.py下运行)
def test_mrcnn_mask_loss_graph(): """Mask binary cross-entropy loss for the masks head. target_masks: [batch, num_rois, height, width]. A float32 tensor of values 0 or 1. Uses zero padding to fill array. target_class_ids: [batch, num_rois]. Integer class IDs. Zero padded. pred_masks: [batch, proposals, height, width, num_classes] float32 tensor with values from 0 to 1. """ batch = 4 num_rois = 32 height = width = 224 num_classes = 10 proposals = 64 target_masks = tf.placeholder(tf.float32, [batch, num_rois, height, width]) target_class_ids = tf.placeholder(tf.int32, [batch, num_rois]) pred_masks = tf.placeholder(tf.float32, [batch, proposals, height, width, num_classes]) loss = mrcnn_mask_loss_graph(target_masks=target_masks, target_class_ids=target_class_ids, pred_masks=pred_masks) sess = tf.Session() loss = sess.run(loss, feed_dict={ target_masks: np.random.random([batch, num_rois, height, width]).astype(np.float32), target_class_ids: np.random.randint(0, 10, size=[batch, num_rois]).astype(np.int32), pred_masks: np.random.random([batch, proposals, height, width, num_classes]).astype(np.float32) }) print("loss :") print(loss)def test_mrcnn_bbox_loss_graph(): """Loss for Mask R-CNN bounding box refinement. target_bbox: [batch, num_rois, (dy, dx, log(dh), log(dw))] target_class_ids: [batch, num_rois]. Integer class IDs. pred_bbox: [batch, num_rois, num_classes, (dy, dx, log(dh), log(dw))] """ num_rois = 64 num_classes = 10 target_bbox = tf.placeholder(tf.float32, [None, num_rois, 4]) target_class_ids = tf.placeholder(tf.int32, [None, num_rois]) pred_bbox = tf.placeholder(tf.float32, [None, num_rois, num_classes, 4]) loss = mrcnn_bbox_loss_graph(target_bbox=target_bbox, target_class_ids=target_class_ids, pred_bbox=pred_bbox) print("loss :") print(loss)def test_mrcnn_class_loss_graph(): """Loss for the classifier head of Mask RCNN. target_class_ids: [batch, num_rois]. Integer class IDs. Uses zero padding to fill in the array. pred_class_logits: [batch, num_rois, num_classes] active_class_ids: [batch, num_classes]. Has a value of 1 for classes that are in the dataset of the image, and 0 for classes that are not in the dataset. ids but used as float32 dtype """ # During model building, Keras calls this function with # target_class_ids of type float32. Unclear why. Cast it # to int to get around it. num_rois = 64 num_classes = 10 target_class_ids = tf.placeholder(tf.int32, [None, num_rois]) pred_class_logits = tf.placeholder(tf.float32, [None, num_rois, num_classes]) active_class_ids = tf.placeholder(tf.float32, [None, num_classes]) loss = mrcnn_class_loss_graph(target_class_ids=target_class_ids, pred_class_logits=pred_class_logits, active_class_ids=active_class_ids) print("loss :") print(loss)def test_rpn_bbox_loss_graph(): """Return the RPN bounding box loss graph. config: the model config object. target_bbox: [batch, max positive anchors, (dy, dx, log(dh), log(dw))]. Uses 0 padding to fill in unsed bbox deltas. rpn_match: [batch, anchors, 1]. Anchor match type. 1=positive, -1=negative, 0=neutral anchor. rpn_bbox: [batch, anchors, (dy, dx, log(dh), log(dw))] """ # Positive anchors contribute to the loss, but negative and # neutral anchors (match value of 0 or -1) don't. from collections import namedtuple max_positive_anchors = 64 anchors = 32 target_bbox = tf.placeholder(tf.float32, [None, max_positive_anchors, 4]) rpn_match = tf.placeholder(tf.float32, [None, anchors, 1]) rpn_bbox = tf.placeholder(tf.float32, [None, anchors, 4]) config = namedtuple("config", ["IMAGE_PER_GPU"]) config.IMAGE_PER_GPU = 128 loss = rpn_bbox_loss_graph(config=config, target_bbox=target_bbox, rpn_match=rpn_match, rpn_bbox=rpn_bbox) print("loss :") print(loss)def test_batch_pack_graph(): num_rows = 10 x = tf.placeholder(tf.float32, [num_rows, 128, 100]) counts = np.random.randint(0, 100, size=[num_rows]) concat = batch_pack_graph(x, counts, num_rows) print("concat :") print(concat)def test_fpn_classifier_graph(): """Builds the computation graph of the feature pyramid network classifier and regressor heads. rois: [batch, num_rois, (y1, x1, y2, x2)] Proposal boxes in normalized coordinates. feature_maps: List of feature maps from different layers of the pyramid, [P2, P3, P4, P5]. Each has a different resolution. image_meta: [batch, (meta data)] Image details. See compose_image_meta() pool_size: The width of the square feature map generated from ROI Pooling. num_classes: number of classes, which determines the depth of the results train_bn: Boolean. Train or freeze Batch Norm layers fc_layers_size: Size of the 2 FC layers Returns: logits: [batch, num_rois, NUM_CLASSES] classifier logits (before softmax) probs: [batch, num_rois, NUM_CLASSES] classifier probabilities bbox_deltas: [batch, num_rois, NUM_CLASSES, (dy, dx, log(dh), log(dw))] Deltas to apply to proposal boxes """ batch = 32 num_rois = 1024 TOP_DOWN_PYRAMID_SIZE = 256 feature_map_one = tf.placeholder(tf.float32, [batch, 224, 224, TOP_DOWN_PYRAMID_SIZE]) feature_map_two = tf.placeholder(tf.float32, [batch, 128, 128, TOP_DOWN_PYRAMID_SIZE]) feature_map_three = tf.placeholder(tf.float32, [batch, 64, 64, TOP_DOWN_PYRAMID_SIZE]) feature_map_four = tf.placeholder(tf.float32, [batch, 32, 32, TOP_DOWN_PYRAMID_SIZE]) feature_maps = [feature_map_one, feature_map_two, feature_map_three, feature_map_four] rois = tf.placeholder(tf.float32, [batch, num_rois, 4]) image_meta = tf.placeholder(tf.float32, [batch, 12]) pool_size = 7 num_classes = 10 mrcnn_class_logits, mrcnn_probs, mrcnn_bbox = fpn_classifier_graph(rois=rois, feature_maps=feature_maps, pool_size=pool_size, num_classes=num_classes, train_bn=True, fc_layers_size=1024, image_meta=image_meta) print("mrcnn_class_logits :") print(mrcnn_class_logits) print("mrcnn_probs :") print(mrcnn_probs) print("mrcnn_bbox :") print(mrcnn_bbox)def test_TimeDistributed(): # timedistributed shared weights in time dim conv_layer = KL.Conv2D(filters = 128, kernel_size = (3, 3), input_shape=(224, 224, 3), padding="same")(tf.placeholder(tf.float32, [None, 224, 224, 3])) print("conv_layer :") print(conv_layer) timeDistributed_layer = KL.TimeDistributed(KL.Conv2D(filters = 128, kernel_size = (3, 3), input_shape=(224, 224, 3), padding="same"), input_shape=(224, 224, 3))(tf.placeholder(tf.float32, [None, 10, 224, 224, 3])) print("timeDistributed_layer :") print(timeDistributed_layer)def test_rpn_graph(): """Builds the computation graph of Region Proposal Network. feature_map: backbone features [batch, height, width, depth] anchors_per_location: number of anchors per pixel in the feature map anchor_stride: Controls the density of anchors. Typically 1 (anchors for every pixel in the feature map), or 2 (every other pixel). Returns: rpn_class_logits: [batch, H * W * anchors_per_location, 2] Anchor classifier logits (before softmax) rpn_probs: [batch, H * W * anchors_per_location, 2] Anchor classifier probabilities. rpn_bbox: [batch, H * W * anchors_per_location, (dy, dx, log(dh), log(dw))] Deltas to be applied to anchors. """ # TODO: check if stride of 2 causes alignment issues if the feature map # is not even. # Shared convolutional base of the RPN batch = 32 height = width = 224 feature_map = tf.placeholder(tf.float32, [None, height, width, 3]) anchors_per_location = 1 anchor_stride = 1 rpn_class_logits, rpn_probs, rpn_bbox = rpn_graph(feature_map, anchors_per_location, anchor_stride) print("rpn_class_logits :") print(rpn_class_logits) print("rpn_probs :") print(rpn_probs) print("rpn_bbox :") print(rpn_bbox)def test_refine_detections_graph(): """Refine classified proposals and filter overlaps and return final detections. Inputs: rois: [N, (y1, x1, y2, x2)] in normalized coordinates probs: [N, num_classes]. Class probabilities. deltas: [N, num_classes, (dy, dx, log(dh), log(dw))]. Class-specific bounding box deltas. window: (y1, x1, y2, x2) in normalized coordinates. The part of the image that contains the image excluding the padding. Returns detections shaped: [num_detections, (y1, x1, y2, x2, class_id, score)] where coordinates are normalized. """ from collections import namedtuple N = 32 num_classes = 10 rois = tf.placeholder(tf.float32, [N, 4]) probs = tf.placeholder(tf.float32, [N, num_classes]) deltas = tf.placeholder(tf.float32, [N, num_classes, 4]) window = tf.placeholder(tf.float32, [4]) config = namedtuple("config", ["BBOX_STD_DEV", "DETECTION_MIN_CONFIDENCE", "DETECTION_MAX_INSTANCES", "DETECTION_NMS_THRESHOLD"]) config.BBOX_STD_DEV = 1.0 config.DETECTION_MAX_INSTANCES = 16 config.DETECTION_MIN_CONFIDENCE = 0.5 config.DETECTION_NMS_THRESHOLD = 0.7 detection = refine_detections_graph(rois=rois, probs=probs, deltas=deltas, window=window, config=config) print("detection :") print(detection)def test_detection_targets_graph(): """Generates detection targets for one image. Subsamples proposals and generates target class IDs, bounding box deltas, and masks for each. Inputs: proposals: [POST_NMS_ROIS_TRAINING, (y1, x1, y2, x2)] in normalized coordinates. Might be zero padded if there are not enough proposals. gt_class_ids: [MAX_GT_INSTANCES] int class IDs gt_boxes: [MAX_GT_INSTANCES, (y1, x1, y2, x2)] in normalized coordinates. gt_masks: [height, width, MAX_GT_INSTANCES] of boolean type. Returns: Target ROIs and corresponding class IDs, bounding box shifts, and masks. rois: [TRAIN_ROIS_PER_IMAGE, (y1, x1, y2, x2)] in normalized coordinates class_ids: [TRAIN_ROIS_PER_IMAGE]. Integer class IDs. Zero padded. deltas: [TRAIN_ROIS_PER_IMAGE, (dy, dx, log(dh), log(dw))] masks: [TRAIN_ROIS_PER_IMAGE, height, width]. Masks cropped to bbox boundaries and resized to neural network output size. Note: Returned arrays might be zero padded if not enough target ROIs. """ from collections import namedtuple POST_NMS_ROIS_TRAINING = 128 MAX_GT_INSTANCES = 64 height, width = 224, 224 proposals = tf.placeholder(tf.float32, [POST_NMS_ROIS_TRAINING, 4]) gt_class_ids = tf.placeholder(tf.float32, [MAX_GT_INSTANCES]) gt_boxes = tf.placeholder(tf.float32, [MAX_GT_INSTANCES, 4]) gt_masks = tf.placeholder(tf.float32, [height, width, MAX_GT_INSTANCES]) config = namedtuple("config", ["TRAIN_ROIS_PER_IMAGE", "ROI_POSITIVE_RATIO", "BBOX_STD_DEV", "USE_MINI_MASK", "MASK_SHAPE" ]) config.TRAIN_ROIS_PER_IMAGE = 1024 config.ROI_POSITIVE_RATIO = 0.7 config.BBOX_STD_DEV = 1.0 config.USE_MINI_MASK = True config.MASK_SHAPE = (7, 7) rois, roi_gt_class_ids, delta, masks = detection_targets_graph(proposals, gt_class_ids, gt_boxes, gt_masks, config) print("rois :") print(rois) print("roi_gt_class_ids :") print(roi_gt_class_ids) print("delta :") print(delta) print("masks :") print(masks)def test_roi_gt_box_assignment(): # process null condition num_proposals = 64 num_gt_boxes = 32 positive_count = 16 overlaps = tf.placeholder(tf.float32, [num_proposals, num_gt_boxes]) positive_indices = tf.placeholder(tf.int64, [positive_count]) positive_overlaps = tf.gather(overlaps, positive_indices) print("positive_overlaps :") print(positive_overlaps) print(" tf.greater(tf.shape(positive_overlaps)[1], 0) :") print( tf.greater(tf.shape(positive_overlaps)[1], 0)) roi_gt_box_assignment = tf.cond( tf.greater(tf.shape(positive_overlaps)[1], 0), true_fn=lambda : tf.argmax(positive_overlaps, axis=1), false_fn=lambda : tf.cast(tf.constant([]), tf.int64) ) print("roi_gt_box_assignment :") print(roi_gt_box_assignment)def test_overlaps_graph(): N = 32 boxes1 = tf.placeholder(tf.float32, [None, 4]) boxes2 = tf.placeholder(tf.float32, [None, 4]) overlaps_iou = overlaps_graph(boxes1, boxes2) print("overlaps_iou :") print(overlaps_iou)def test_crop_and_resize(): input_image = tf.placeholder(tf.float32, [None, 224, 224, 3]) # batch size should be small than it. so the aggregate process should be applied. # to merge features of same sample. num_boxes = 32 # this boxes used not identity to all batch samples, set every box of batch # and flatten shape so num_boxes >= batch boxes = tf.placeholder(tf.float32, [num_boxes, 4]) # elements in [0, batch) box_ind = tf.placeholder(tf.int32, [num_boxes]) crop_size = (7, 7) crop_and_resize_ext = tf.image.crop_and_resize( image = input_image, boxes = boxes, box_ind = box_ind, crop_size = crop_size, ) print("crop_and_resize_ext :") print(crop_and_resize_ext)def test_PyramidROIAlign(): batch = 32 num_boxes = 64 TOP_DOWN_PYRAMID_SIZE = 256 feature_map_one = tf.placeholder(tf.float32, [batch, 224, 224, TOP_DOWN_PYRAMID_SIZE]) feature_map_two = tf.placeholder(tf.float32, [batch, 128, 128, TOP_DOWN_PYRAMID_SIZE]) feature_map_three = tf.placeholder(tf.float32, [batch, 64, 64, TOP_DOWN_PYRAMID_SIZE]) feature_map_four = tf.placeholder(tf.float32, [batch, 32, 32, TOP_DOWN_PYRAMID_SIZE]) feature_maps = [feature_map_one, feature_map_two, feature_map_three, feature_map_four] pool_shape = [7, 7] boxes = tf.placeholder(tf.float32, [batch, num_boxes, 4]) # image_meta length depend on num_classes, this sample choose maintain the smallest i.e. 12 image_meta = tf.placeholder(tf.float32, [batch, 12]) inputs = [boxes, image_meta] + feature_maps pyramidPOIAlign_ext = PyramidROIAlign(pool_shape=pool_shape).call(inputs)def test_ProposalLayer(): from collections import namedtuple config = namedtuple("config", ["RPN_BBOX_STD_DEV", "PRE_NMS_LIMIT", "IMAGE_PER_GPU"]) config.RPN_BBOX_STD_DEV = [1.0, 1.0, 1.0, 1.0] config.PRE_NMS_LIMIT = 128 config.IMAGE_PER_GPU = 32 # equal to batch proposal_ext = ProposalLayer(proposal_count = 32, nms_threshold = 0.7, config = config) batch = config.IMAGE_PER_GPU num_anchors = 1024 rpn_probs = tf.placeholder(tf.float32, [batch, num_anchors, 2]) rpn_bbox = tf.placeholder(tf.float32, [batch, num_anchors, 4]) anchors = tf.placeholder(tf.float32, [batch, num_anchors, 4]) inputs = [rpn_probs, rpn_bbox, anchors] output = proposal_ext(inputs) print("inputs :") print(inputs) print("output :") print(output)def test_batch_slice(): # batch_slice apply opertation to top score, # the number of items processed limit by the batch_size input_0 = tf.placeholder(tf.float32, [None, 2]) input_1 = tf.placeholder(tf.int32, [None]) inputs = [input_0, input_1] output = utils.batch_slice(inputs=inputs, graph_fn=lambda x, y: tf.gather(x, y), batch_size=3) sess = tf.Session() output = sess.run( output, feed_dict={ input_0: np.random.random(size=[10, 2]) - 0.5, input_1: np.random.randint(0, 2, size=[10]) } ) print("output :") print(output) print("output shape :") print(output.shape)def test_clip_boxes_graph(): window = tf.constant([500, 600, 700, 800], dtype=tf.float32) boxes = tf.placeholder(tf.float32, [None, 4]) import numpy as np input_boxes = np.concatenate([np.random.randint(0, 800, size=[2, 2]), np.random.randint(0, 428, size = [2, 2])], axis=1) input_boxes[:, 2:] += input_boxes[:, :2] sess = tf.Session() clip_tensor = clip_boxes_graph(boxes, window) print("boxes :") print(input_boxes)