C#开源即时通讯GGTalk

C#开源即时通讯GGTalk

GGTalk 对需要频繁查询数据库的数据做了服务端全局缓存处理,这样做一来大大降低了数据库的读取压力,二来在客户端的请求到来时,服务端能更快地响应,极大地提升了用户体验。这篇文章将会详细剖析关于 GGTalk 服务端全局缓存的设计与实现。还没有GGTalk源码的朋友,可以到 GGTalk源码下载中心 下载。

一. GGTalk 服务端三大核心

首先,我们需要了解 GGTalk服务端 的三大核心,其分别是:

  • 消息处理:处理来自客户端的消息;
  • 全局缓存:将用户和群组的数据缓存在内存中;
  • 数据库交互:对数据库中的信息进行增删改查。

1. 消息处理

当一个客户端的请求进来时,首先会进入消息处理环节,根据用户传递的消息号,进入不同的逻辑分支。以修改用户信息为例:

//(客户端逻辑代码)
/// <summary>
/// 修改个人资料。
/// </summary>        
public void ChangeMyBaseInfo(string name, string signature, string department) {
  //...
  this.rapidPassiveEngine.SendMessage(null, this.talkBaseInfoTypes.ChangeMyBaseInfo, data, "", true);
  //...
}

当一个用户信息被修改时,会调用如上方法,然后通过调用 rapid客户端引擎 上的 SendMessage 方法发送一条消息(其中 data 为用户信息的 byte[]数组)。

//(服务端逻辑代码)
public void Initialize() {
//...
  this.rapidServerEngine.MessageReceived += new ESBasic.CbGeneric<string,ClientType, int, byte[], string>(rapidServerEngine_MessageReceived);
//...
}

客户端发送消息会触发 rapid服务端引擎 上的 MessageReceived 事件,最终程序流程会来到如下图的地方。

GGTalk 开源即时通讯系统源码剖析之:服务端全局缓存-LMLPHP

根据客户端传递在消息号来匹配对应的 if分支,然后进行对应的处理。

2. 全局缓存

接着前面修改用户信息的例子:

if (informationType == this.talkBaseInfoTypes.ChangeMyBaseInfo) {
//...
  this.serverGlobalCache.UpdateUserInfo(sourceUserID, contract.Name, contract.Signature, contract.OrgID);
  TUser user = this.serverGlobalCache.GetUser(sourceUserID);
//...
}

消息处理后会来到如上 if分支,其中分别调用了 serverGlobalCache 上的 UpdateUserInfoGetUser 方法,下面是这两个方法的具体实现。

/// <summary>
/// 获取目标用户,如果缓存中不存在,则从DB加载。
/// </summary>        
public TUser GetUser(string userID) {
  TUser user = this.userCache.Get(userID);
  if (user == null) {
    user = this.dbPersister.GetUser(userID);
    if (user != null) {
      this.userCache.Add(userID, user);
    }
  }
  return user;
}

此方法会从全局缓存获取用户数据,若缓存中不存在,则会从数据库中查询,并将查询到的用户数据存入缓存中,方法最终返回用户数据。

// 更新用户信息
public void UpdateUserInfo(string userID, string name, string signature, string orgID) {
  TUser user = this.GetUser(userID);
  if (user == null) {
    return;
  }
  user.Name = name;
  user.Signature = signature;
  user.OrgID = orgID;
  user.Version += 1;
  user.DeletePartialCopy();
  this.dbPersister.UpdateUserInfo(userID, name, signature, orgID, user.Version);
}

此方法先去获取用户的信息,修改用户信息,然后通过调用 user 上的 DeletePartialCopy 方法清除用户的缓存,最后再更新数据库中用户的信息。

3. 数据库交互

同样在这个修改用户信息的例子中,在前面的讲解中有涉及到两处与数据库的交互,分别是 GetUserUpdateUserInfo 方法的调用。下面是这两个方法的具体实现:

// 获取用户信息
public GGUser GetUser(string userID) {
  GGUser user = null;
  user = db.Queryable<GGUser>().Where(it => it.UserID == userID).First();
  return user;
}
// 更新用户信息
public void UpdateUserInfo(string userID, string name, string signature, string orgID, int version) {
  db.Updateable<GGUser>(it => new GGUser() { Signature = signature, Name = name, OrgID = orgID, Version = version }).Where(it => it.UserID == userID).ExecuteCommand();
}

在数据库的交互环节,我们使用的是 sqlsugar 来操作数据库(这是一个开源的ORM框架,若想了解其详细用法,请移步sqlsugar文档)。

二. 服务端全局缓存

1. 代码位置

GGTalk 开源即时通讯系统源码剖析之:服务端全局缓存-LMLPHP

由于在 GGTalk服务端 中对用户和群组信息查询过于频繁,故而 GGTalk 将用户和群组的信息缓存在服务端内存之中,进而达到减少资源消耗和更快的服务端响应的好处,但这样做同时也会增加编码的复杂度,那么 GGTalk 是如何在其中进行取舍的呢?下面将介绍具体实现。

2. ServerGlobalCache类

public class ServerGlobalCache<TUser, TGroup>
  where TUser : TalkBase.IUser
  where TGroup : TalkBase.IGroup
{
  private IDBPersister<TUser, TGroup> dbPersister;
  //...
  private ObjectManager<string, TUser> userCache = new ObjectManager<string, TUser>(); // key:用户ID 。 Value:用户信息
  private ObjectManager<string, TGroup> groupCache = new ObjectManager<string, TGroup>();  // key:组ID 。 Value:Group信息 
  //...
}

ServerGlobalCache类 就是 GGTalk 服务端全局缓存的核心实现了,这个类接受两个泛型参数,TUserTGroup,并要求TUser必须是TalkBase命名空间中的IUser接口的实现类或子类。TGroup必须是TalkBase命名空间中的IGroup接口的实现类或子类。

  • IUser:用户基础接口,定义了关于用户一系列的属性和方法。
/// <summary>
/// 用户基础接口。
/// </summary>
public interface IUser : IUnit {
  List<string> GroupList { get; }
  UserStatus UserStatus { get; set; }
  string GetFriendCatalog(string friendID);
  string GetUnitCommentName(string unitID);
  string Signature { get; set; }     
  string OrgID { get; set; }
  /// <summary>
  /// 用户使用状态
  /// </summary>
  UserState UserState { get; set; }
  bool IsFriend(string userID);
  List<string> GetAllFriendList();
  void ChangeHeadImage(int defaultHeadImageIndex, byte[] customizedHeadImage);
  DateTime PcOfflineTime { get; set; }
  DateTime MobileOfflineTime { get; set; }
}
  • IGroup:群/讨论组的基础接口,定义了一系列关于群/讨论组的属性和方法。
/// <summary>
/// 群、讨论组 基础接口。
/// </summary>
public interface IGroup : IUnit {
  GroupType GroupType { get; }
  string CreatorID { get; }
  DateTime CreateTime { get; }
  List<string> MemberList { get; }
  void AddMember(string userID);
  void RemoveMember(string userID);
  string Announce { get; set; }
  void ChangeMembers(List<string> members);
}

除了这两个泛型参数外,我们可以发现 ServerGlobalCache类 中还有三个字段,这三个字段是 ServerGlobalCache类 中所有方法的核心,其作用如下:

  • dbPersister:与数据库进行交互;
  • userCache:与用户缓存相关;
  • groupCache:与群组缓存相关。

关于这三个字段,在后面的具体场景会展开更加详细的介绍。

3. 缓存数据的实现

关于服务端缓存,最关键的就是 userCachegroupCache 字段了,其中 userCache 用于缓存用户的信息;而 groupCache 用于缓存群组的信息。

首先我们来看关于这两个字段的类型ObjectManager
public class ObjectManager<TPKey, TObject>

ObjectManager是对Dictionary的二次封装,支持多线程安全,使用起来也更方便。这个类接受两个泛型参数,我们通过传入不同的泛型可以实现不同数据的管理(在 GGTalk服务端 中,仅管理了用户和群组的数据)。

其内部的Dictionary就是用来将用户或群组的数据存储在内存中,达到缓存数据的目的。

4. 将数据库中数据的读入内存(缓存数据)

我们来看 ServerGlobalCache类 中如下两个方法:

GGTalk 开源即时通讯系统源码剖析之:服务端全局缓存-LMLPHP

从名字上来看我们很容易就知道这两个方法的意思,预加载用户数据和预加载群组数据,这两个方法的主要作用就是将数据库中用户和群组的数据加载到内存中。首先通过 dbPersister 字段来从数据库中查询到所有用户和群组数据,通过foreach遍历,分别调用userCachegroupCache上的Add方法将每一条数据存储到前面提到的objectDictionary字段中,也是就存储在了服务端程序运行时的内存里面。

5. 数据库增删改查

看到这里,关于 ServerGlobalCache类 的基础设施你已经了解的七七八八了,接下来都是基于这些基础设施而实现的方法了。在这里我要纠正一个你可能感到疑惑的点,本篇文章不是介绍服务端缓存吗,这里怎么扯到数据库的增删改查呢?

因为往往数据缓存和数据源之间存在着一些联动,所以 ServerGlobalCache类 的作用不仅仅是缓存数据,同时也存在大量获取数据库中的数据的方法,这也是为什么在类里面会有一个dbPersister字段,当然关于具体从数据库中读取数据的方法不在这个类里边(回顾 GGTalk三大核心)。

接下来,让我们看看 ServerGlobalCache类 还有什么:

GGTalk 开源即时通讯系统源码剖析之:服务端全局缓存-LMLPHP

我们可以看到,这些折叠的部分的代码行数几乎占据了 ServerGlobalCache类 的百分之九十,这是正是对数据库和数据缓存的操作,每个折叠代码块的注释都对应着 GGTalk数据库 的一张表。

接下来我们主要分析一下关于用户和群组的部分操作,看看 GGTalk服务端 是如何对数据库和数据缓存进行操作的。

首先来看一个简单的,添加新用户操作:

/// <summary>
/// 插入一个新用户。
/// </summary>      
public void InsertUser(TUser user) {
  this.userCache.Add(user.ID, user);
  this.dbPersister.InsertUser(user);
}

这个方法接受一个TUser类型的参数,参数中包含用户的必要信息,然后分别添加到用户缓存和插入到数据库中。

接下来,再看最开始讲三大核心的那个例子:

/// <summary>
/// 获取目标用户,如果缓存中不存在,则从DB加载。
/// </summary>        
public TUser GetUser(string userID) {
  TUser user = this.userCache.Get(userID);
  if (user == null) {
    user = this.dbPersister.GetUser(userID);
    if (user != null) {
      this.userCache.Add(userID, user);
    }
  }
  return user;
}

现在再来看是不是很清晰了呢,这个方法用于查询单个用户,接受一个用户ID,首先会从用户缓存中查找这个用户,如果缓存中不存在,则从数据库中查找,在用户存在的情况下再将其存入内存之中。

接下来再分析两个关于群组操作的方法。

1、根据群组ID获取群组信息:

/// <summary>
/// 获取某个组
/// </summary>       
public TGroup GetGroup(string groupID) {
  TGroup group = this.groupCache.Get(groupID);
  if (group == null) {
    group = this.dbPersister.GetGroup(groupID);
    if (group != null) {
      this.groupCache.Add(groupID, group);
    }
  }
  return group;
}

和获取用户信息类似,此方法首先会在群组缓存中查找对应ID的群组,若群组不存在,则会从数据库读取对应ID的群组,并且在群组存在的情况下将其存入内存之中。

2、解散群组操作

public void DeleteGroup(string groupID) {
  TGroup group = this.GetGroup(groupID);
  if (group == null) {
    return;
  }
  foreach (string userID in group.MemberList) {
    TUser user = this.GetUser(userID);
    if (user != null) {
      user.QuitGroup(groupID);
      this.dbPersister.UpdateUserGroups(user);
    }
  }
  this.groupCache.Remove(groupID);
  this.dbPersister.DeleteGroup(groupID);
  this.dbPersister.DeleteAddGroupRequest(groupID);
}

这个方法接受群组ID作为参数,首先会调用GetCroup方法依次从内存和数据库中读取关于目标群组的数据(如果缓存中没有的话)。若群组存在,则从群组的MemberList属性中遍历用户ID,再通过GetUser方法查询用户数据,通过用户的QuitGroup退出群组,然后在数据库中更新用户的信息。在这个群组中的每一个存在的用户都退出群组后,从群组缓存中清除该群组的数据。然后再同步数据库中的群组表的数据,以及在数据库中申请加入群组表中删除加入此群组的记录。

三. 结语

将数据库中的数据缓存在内存中是一把双刃剑,若是将大量的数据保存在内存中,这会大大加大内存的占用,存在程序因为内存不足而导致程序崩溃的风险。如何避免这样的事情发生,这要求我们对内存保持足够的敏感。最后,希望这篇文章能够对你有所帮助。
在接下来的一篇我们将介绍GGTalk服务端的虚拟数据库。
敬请期待:《GGTalk 开源即时通讯系统源码剖析之:虚拟数据库》

07-06 22:14