我有一个模块,它依赖于对外部服务的调用来填充缓存,如下所示:

func (provider *Cache) GetItem(productId string, skuId string, itemType string) (*Item, error) {

    // First, create the key we'll use to uniquely identify the item
    key := fmt.Sprintf("%s:%s", productId, skuId)

    // Now, attempt to get the concurrency control associated with the item key
    // If we couldn't find it then create one and add it to the map
    var once *sync.Once
    if entry, ok := provider.lockMap.Load(key); ok {
        once = entry.(*sync.Once)
    } else {
        once = &sync.Once{}
        provider.lockMap.Store(key, once)
    }

    // Now, use the concurrency control to attempt to request the item
    // but only once. Channel any errors that occur
    cErr := make(chan error, 1)
    once.Do(func() {

        // We didn't find the item in the cache so we'll have to get it from the partner-center
        item, err := provider.client.GetItem(productId, skuId)
        if err != nil {
            cErr <- err
            return
        }

        // Add the item to the cache
        provider.cache.Store(key, &item)
    })

    // Attempt to read an error from the channel; if we get one then return it
    // Otherwise, pull the item out of the cache. We have to use the select here because this is
    // the only way to attempt to read from a channel without it blocking
    var sku interface{}
    select {
    case err, ok := <-cErr:
        if ok {
            return nil, err
        }
    default:
        item, _ = provider.cache.Load(key)
    }

    // Now, pull out a reference to the item and return it
    return item.(*Item), nil
}

这种方法可以正常工作。我的问题是测试;专门测试以确保对于给定的key值仅调用GetItem方法一次。我的测试代码如下:
var _ = Describe("Item Tests", func() {

    It("GetItem - Not cached, two concurrent requests - Client called once", func() {

        // setup cache

        // Setup a wait group so we can ensure both processes finish
        var wg sync.WaitGroup
        wg.Add(2)

        // Fire off two concurrent requests for the same SKU
        go runRequest(&wg, cache)
        go runRequest(&wg, cache)
        wg.Wait()

        // Check the cache; it should have one value
        _, ok := cache.cache.Load("PID:SKUID")
        Expect(ok).Should(BeTrue())

        // The client should have only been requested once
        Expect(client.RequestCount).Should(Equal(1)) // FAILS HERE
    })
})

// Used for testing concurrency
func runRequest(wg *sync.WaitGroup, cache *SkuCache) {
    defer wg.Done()
    sku, err := cache.GetItem("PID", "SKUID", "fakeitem")
    Expect(err).ShouldNot(HaveOccurred())
}

type mockClient struct {
    RequestFails    bool
    RequestCount    int
    lock            sync.Mutex
}

func NewMockClient(requestFails bool) *mockClient {
    return &mockClient{
        RequestFails:    requestFails,
        RequestCount:    0,
        lock:            sync.Mutex{},
    }
}

func (client *mockClient) GetItem(productId string, skuId string) (item Item, err error) {
    defer GinkgoRecover()

    // If we want to simulate client failure then return an error here
    if client.RequestFails {
        err = fmt.Errorf("GetItem failed")
        return
    }

    // Sleep for 100ms so we can more accurately simulate the request latency
    time.Sleep(100 * time.Millisecond)

    // Update the request count
    client.lock.Lock()
    client.RequestCount++
    client.lock.Unlock()

    item = Item{
        Id:              skuId,
        ProductId:       productId,
    }

    return
}

我一直遇到的问题是该测试有时会失败,因为在注释行中,请求计数为2时(预期为1)。此故障不一致,因此我不确定如何调试此问题。任何帮助将不胜感激。

最佳答案

我认为您的测试有时会失败,因为您的缓存无法保证它只能提取一次项目,因此您很幸运测试能够捕获这一点。

如果其中没有一个项目,并且有2个并发的goroutine同时调用Cache.GetItem(),则有可能lockMap.Load()会报告这两个键均不在 map 中,这两个goroutine都会创建一个sync.Once,并且两者都将存储自己的实例在 map 中(显然只有后者(后者)会保留在 map 中,但您的缓存不会对此进行检查)。

然后,这两个goroutine都会调用client.GetItem(),因为2个单独的sync.Once不提供同步。仅当使用相同的sync.Once实例时,才保证传递给Once.Do()的函数仅执行一次。

我认为sync.Mutex会更容易且更合适,以避免在此处创建和使用2 sync.Once

或者因为您已经在使用sync.Map,所以可以使用 Map.LoadOrStore() 方法:创建一个sync.Once,并将其传递给Map.LoadOrStore()。如果键已在 map 中,请使用返回的sync.Once。如果 key 不在 map 中,则您的sync.Once将存储在其中,因此您可以使用它。这将确保没有多个并发goroutine可以在其中存储多个sync.once实例。

像这样:

var once *sync.Once
if entry, loaded := provider.lockMap.LoadOrStore(key, once); loaded {
    // Was already in the map, use the loaded Once
    once = entry.(*sync.Once)
}

此解决方案仍然不是完美的:如果2个goroutine同时调用Cache.GetItem(),则只有一个会尝试从客户端获取项目,但如果失败,则仅此goroutine将报告错误,而另一个goroutine将不会尝试获取该项目来自客户端,但会从 map 加载该项目,您无需检查加载是否成功。您应该并且,如果它不在 map 中,则意味着另一次并发尝试无法获取它。因此,您应该随后报告错误(并清除sync.Once)。

如您所见,它变得越来越复杂。我坚持我以前的建议:在这里使用sync.Mutex会更容易。

关于go - 在Golang中测试一次条件,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/61110211/

10-15 20:36