前面我们的EventBus已经弄好了,那么接下来通过EventBus来实现我们的消息推送就是自然而然的事情了。
说到消息推送,很多人肯定会想到Websocket,既然我们使用Asp.net core,那么SignalR肯定是我们的首选。
接下来就用SignalR来实现我们的消息实时推送。

NotificationHub

首选我们需要创建一个Hub,用于连接SignalR。
添加NotificationHub类继承SignalR.Hub

using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Localization;
using Wheel.Notifications;

namespace Wheel.Hubs
{
    public class NotificationHub : Hub
    {
        protected IStringLocalizer L;

        public NotificationHub(IStringLocalizerFactory localizerFactory)
        {
            L = localizerFactory.Create(null);
        }

        public override async Task OnConnectedAsync()
        {
            if (Context.UserIdentifier != null)
            {
                var wellcome = new NotificationData(NotificationType.WellCome)
                    .WithData("name", Context.User!.Identity!.Name!)
                    .WithData("message", L["Hello"].Value);
                await Clients.Caller.SendAsync("Notification", wellcome);
            }
        }
    }
}

这里重写OnConnectedAsync,当用户授权连接之后,立马推送一个Hello的消息。

约定消息通知结构

为了方便并且统一结构,我们最好约定一组通知格式,方便客户端处理消息。
创建一个NotificationData类:

namespace Wheel.Notifications
{
    public class NotificationData
    {
        public NotificationData(NotificationType type)
        {
            Type = type;
        }

        public NotificationType Type { get; set; }

        public IDictionary<string, object> Data { get; set; } = new Dictionary<string, object>();

        public NotificationData WithData(string name, object value) 
        {
            Data.Add(name, value);
            return this;
        }
    }
    public enum NotificationType
    {
        WellCome = 0,
        Info = 1,
        Warn = 2,
        Error = 3
    }
}

NotificationData包含消息通知类型Type,以及消息数据Data。

自定义UserIdProvider

有时候我们可以能需要自定义用户表示,那么就需要实现一个自定义的IUserIdProvider。

using Microsoft.AspNetCore.SignalR;
using System.Security.Claims;
using Wheel.DependencyInjection;

namespace Wheel.Hubs
{
    public class UserIdProvider : IUserIdProvider, ISingletonDependency
    {
        public string? GetUserId(HubConnectionContext connection)
        {
            return connection.User?.Claims?.FirstOrDefault(a=> a.Type == ClaimTypes.NameIdentifier)?.Value;
        }
    }
}

配置SignalR

在Program中我们需要注册SignalR以及配置SignalR中间件。
添加代码:

builder.Services.AddAuthentication(IdentityConstants.BearerScheme)
    .AddBearerToken(IdentityConstants.BearerScheme, options =>
    {
        options.Events = new BearerTokenEvents
        {
            OnMessageReceived = context =>
            {
                var accessToken = context.Request.Query["access_token"];
                // If the request is for our hub...
                var path = context.HttpContext.Request.Path;
                if (!string.IsNullOrEmpty(accessToken) &&
                    (path.StartsWithSegments("/hubs")))
                {
                    // Read the token out of the query string
                    context.Token = accessToken;
                }
                return Task.CompletedTask;
            }
        };
    });

builder.Services.AddSignalR()
    .AddJsonProtocol()
    .AddMessagePackProtocol()
    .AddStackExchangeRedis(builder.Configuration["Cache:Redis"]);

在AddBearerToken配置从Query中读取access_token,用于SignalR连接是从Url获取认证的token。
这里注册SignalR并支持JSON和二进制MessagePackProtocol协议。
AddStackExchangeRedis表示用Redis做Redis底板,用于横向扩展。
配置中间件

app.MapHub<NotificationHub>("/hubs/notification");

就这样完成了我们SignalR的集成。

配合EventBus进行推送

有时候我们有些任务可能非实时响应,等待后端处理完成后,再给客户端发出一个消息通知。或者其他各种消息通知的场景,那么配合EventBus就可以非常灵活了。
接下来我们来模拟一个测试场景
创建NotificationEventData

using MediatR;

namespace Wheel.Handlers
{
    public class NotificationEventData : INotification
    {
        public string Message { get; set; }
    }
}

创建NotificationEventHandler

using Microsoft.AspNetCore.SignalR;
using Wheel.EventBus.Local;
using Wheel.Hubs;
using Wheel.Notifications;

namespace Wheel.Handlers
{
    public class NotificationEventHandler : ILocalEventHandler<NotificationEventData>
    {
        private readonly IHubContext<NotificationHub> _hubContext;

        public NotificationEventHandler(IHubContext<NotificationHub> hubContext)
        {
            _hubContext = hubContext;
        }

        public async Task Handle(NotificationEventData eventData, CancellationToken cancellationToken = default)
        {
            var wellcome = new NotificationData(NotificationType.WellCome)
                .WithData(nameof(eventData.Message), eventData.Message);
            await _hubContext.Clients.All.SendAsync("Notification", wellcome);
        }
    }
}

创建NotificationController

using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Wheel.Handlers;

namespace Wheel.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    [AllowAnonymous]
    public class NotificationController : WheelControllerBase
    {
        [HttpGet]
        public async Task<IActionResult> Test()
        {
            await LocalEventBus.PublishAsync(new NotificationEventData { Message = Guid.NewGuid().ToString() });
            return Ok();
        }
    }
}

启动项目
先获取一个token
造轮子之消息实时推送-LMLPHP
然后用搞一个SignalR客户端连接

using Microsoft.AspNetCore.SignalR.Client;
using System.Text.Json;
using System.Text.Json.Serialization;

var connection = new HubConnectionBuilder()
    .WithUrl("https://localhost:7080/hubs/notification?access_token=CfDJ8PRWI6x4TXdPnDiVcuLDwVtyEhzhaNmV9ggxR0_i0_godBkw1wRkg0ct0DezjpwbJb7s6VJxvr3V8mEGE9d9klp_Bhjv2AZE3eQ78KmJygizroSpfFHeoImRaEYIyLNXkHrNEG-MuszVQ6eVFHORm5Kkv-Rux7_1RkVam0tsPYiypRQhcJqUuV3pbeiblOQpJ1WXikmpZ8-jFSqwkNMSBhUx2w50iTWYiEyqpiyrjQqu69NfEregcwxJBOji4dmxiu1Q4tyaFZMyZ3m10tFrSqHuF0cRBXDUf5BHSBGg0b7LImROubDrn5y_ogBmhd3J165gnbjRDnGvmYr6hQjI1ZmfhR_NyriG9zQ7jE5oZDFIUsXgd0Yqod8HTMlTzxY0gSFglPy-vPhzBVD4-WxRSaCtCaReQHVJUZ-SB15cfmvHXdPN9tjsVlMwlK8nWCuPJmnWdgsfEx8QJisPvfzhH_dosPvFQf1nNH3Gz_9NT858SauuXCXj3AKE48Bh4XY6avpO4GFEdlMgYHmCius1BEqlq8KQB9SVuJFLcvhKt0Xbz_TEYiN0LtBC7Ot4FNOvBOy0a9VswuYII_nAMgnRN4dZTz8z8vNS7Yd1zbDY6mL86OuqvhMhEgzEpgkjhdaBvq13fDTtGKmw6bZXLstYH_kDaXGKxzfP38WSoxZ9EI8LyPpoZzhqUeexEGbwhYRWM9zNFH_wvwUGMUvWne4_ZeVqVir8obns496infwK9x4WCfL91YC7_ac7Q7t5HLg9py_NBXmsHXXrs_2kdA5F6DI")
    .Build();

connection.On<NotificationData>("Notification", (data) =>
{
    var newMessage = JsonSerializer.Serialize(data);
    Console.WriteLine($"{DateTime.Now}---{newMessage}");
});
await connection.StartAsync();

Console.ReadKey();
public class NotificationData
{
    public NotificationData(NotificationType type)
    {
        Type = type;
    }

    public NotificationType Type { get; set; }

    public IDictionary<string, object> Data { get; set; } = new Dictionary<string, object>();

    public NotificationData WithData(string name, object value)
    {
        Data.Add(name, value);
        return this;
    }
}
public enum NotificationType
{
    WellCome = 0,
    Info = 1,
    Warn = 2,
    Error = 3
}

启动程序,由于我们带了accessToken连接,所以连上立马就收到Hello的消息推送。
造轮子之消息实时推送-LMLPHP
调用API发起推送通知。
造轮子之消息实时推送-LMLPHP
可以看到成功接收到了消息通知。
对接非常容易且灵活。

就这样我们轻轻松松完成了消息实时通知的功能集成。

轮子仓库地址https://github.com/Wheel-Framework/Wheel
欢迎进群催更。

造轮子之消息实时推送-LMLPHP

10-13 22:57