本文介绍了一个消息队列任务处理状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我工作的一个产品数据导入系统,它从外部来源下载的产品数据,转换成正确的模式,并存储结果 - 基本上是一个ETL系统。消息的核心类型的系统处理是ImportProductCommand,它指明了产品导入和源。导入命令然而,很少单独发送。一个典型的业务需求是从给定源导入了一整套产品。目前,这是pssed因为它可以指定要导入多个产品的ImportProductsCommand消息,EX $ P $。一个命令处理程序消耗此消息时,将其转换成单独的ImportProductCommand消息,并将它们发送到用于处理的队列中。个别的导入请求的消费者发布了ProductImportedEvent或ProductImportFailedEvent。当收到ImportProductsCommand消息,该服务分配一个GUID令牌到消息,将邮件上的队列,并返回该标记。然后令牌用作相关ID,以便从个人导入请求可以与批导入请求相关联。鉴于这种基础设施,也能够确定与给定令牌相关联的事件的数量,并因此进口产品或导入失败的数目。现在缺少的是一个明确的事件表明,批量导入已完成。个别的导入请求的处理程序是没有明确意识到,这是一个批量导入请求的一部分。当然,这可以通过知道许多产品是如何被导入并通过计数与特定相关ID关联的导入事件的数量来推断。当前实现利用了消息队列系统处理进程重新启动和失败,但不太明确一些批量导入请求。总体而言,系统需要回答查询是:

I am working on a product data importing system which downloads product data from external sources, translates it into the proper schema, and stores the results - essentially an ETL system. The core type of message the system handles is "ImportProductCommand" which specifies the product to import and the source. Import commands however, are rarely sent individually. A typical business requirement is to import a whole set of products from a given source. Currently, this is expressed as an "ImportProductsCommand" message which can specify multiple products to import. A command handler consumes this message, converts it into individual "ImportProductCommand" messages and sends them to a queue for processing. The consumer of individual import requests publishes a "ProductImportedEvent" or "ProductImportFailedEvent". When the "ImportProductsCommand" message is received, the service assigns a GUID token to the message, places the message on a queue, and returns the token. The token is then used as a correlation ID so that individual import requests can be associated with the batch import request. Given this infrastructure, it is possible to determine the number of events associated with a given token, and thus the number of imported products or failed imports. What is missing is an explicit event to indicate that a batch import has completed. The handler of individual import requests isn't explicitly aware that it is part of a batch import request. This of course can be inferred by knowing how many products were to be imported and by counting the number of import events associated with a specific correlation ID. The currently implementation leverages the message queue system to handle process restarts and failures, but is less explicit about the batch import request. Overall, the queries the system needs to answer are:

  • 是一个给定的批量导入做?
  • 有多少个人进口保持一个给定的批量导入?
  • 有多少个人进口完成?
  • 有多少是错误的?

什么是一些最佳做法或建议的方式来支持这些查询,仍然利用消息韧性排队系统?目前,有什么捆绑在一起是上面提到的道理,但目前还没有明确的记录,重新present批量导入请求的实体,如果有,那么个人导入请求处理器就需要知道这样的实体相应地更新状态。

What are some best practices or suggested approaches to support these queries and still leverage the message queuing system for resilience? Currently, what ties it all together is the token mentioned above, but there is no explicit record to represent the batch import request entity, and if there was, then the individual import request processor would need to be aware of such an entity to update the status accordingly.

所有这些使用C#,NServiceBus实施和托管为IIS WCF应用程序。

All of this is implemented using C#, NServiceBus and hosted as an IIS WCF application.

推荐答案

这可以实现为 NServiceBus佐贺 ImportProductsCommand 应该由佐贺处理(ImportProductsSaga ),和​​佐贺的数据可以有产品的数量要导入,因为它发出了 ImportProductCommand ImportProductsSaga 应该处理的 ProductImportedEvent ProductImportFailedEvent 。在每本次活动中的 ImportProductsSaga ,增量处理的 ProductsImported ProdctsFailedToImport 。同时检查(ProductsImported + ProdctsFailedToImport)之和等于ProdctsToBeImported,如果是这样,完成故事。

This can be implemented as NServiceBus Saga. ImportProductsCommand should be handled by a Saga(ImportProductsSaga), and Saga data can have the count of products to be imported as it sends the ImportProductCommand. ImportProductsSaga should handle ProductImportedEvent and ProductImportFailedEvent. On each of this event handled in ImportProductsSaga, increment ProductsImported or ProdctsFailedToImport. Also check the sum of (ProductsImported + ProdctsFailedToImport) equals to ProdctsToBeImported, if so, complete the saga.

ImportProductsSaga数据需要保持无ImportProductCommand轨道发送和回复收到,你可以计算出待处理的回复等佐贺数据看起来像下面例如:

ImportProductsSaga data needs to keep track of No of ImportProductCommand send, and reply received and you can calculate the pending reply etc. Saga data look something like following e.g:

   public class ImportProductsSataData{ 
       public Guid Id {get; set}
       public int ProdctsToBeImported {get; set}
       public int ProdctsImported {get; set}
       public int ProdctsFailedToImport {get; set}
}

这篇关于一个消息队列任务处理状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-19 01:43