有什么不能一笑而过呢

有什么不能一笑而过呢

C# 重新认识一下 IEnumerable<T>,IAsyncEnumerable<T> 以及搭配异步可能遇到的问题

前言

为啥会想到写这个
为了这碟醋,包了这顿饺子
作为老鸟不免犯迷糊
因为 在使用异步中使用IEnumerable<T>,IAsyncEnumerable<T>遇到了一些细节(对于我之前来说)上没注意到问题.

什么是IEnumerable<T>

IEnumerable<T> 继承自 System.Collections.IEnumerable


namespace System.Collections.Generic
{
    //
    // 摘要:
    //     Exposes the enumerator, which supports a simple iteration over a collection of
    //     a specified type.
    //
    // 类型参数:
    //   T:
    //     The type of objects to enumerate.
    public interface IEnumerable<out T> : IEnumerable
    {
        //
        // 摘要:
        //     Returns an enumerator that iterates through the collection.
        //
        // 返回结果:
        //     An enumerator that can be used to iterate through the collection.
        IEnumerator<T> GetEnumerator();
    }
}

以下引用自 微软官方文档

粗俗的说,就是我们可以通过实现了 IEnumerable<T> 接口的容器提高数据处理的效率,因为通过它 我们可以方便的使用 foreach 关键字 遍历容器内的元素,而我们所熟知的大部分的容器,例如,List<T>,Dictionary<TKey,TValue> 等等都是实现了 IEnumerable<T> 的.

除了快速遍历以外,作为返回值 IEnumerable<T> 也有着强大的优势,因为如果是传统的数组遍历的话如果我想要找到多个数组中指定的元素,我必须等到找到所有符合的元素的时候才能将数据返回,调用方才能开始进行操作,而返回结果为 IEnumerable<T> 的方法可以通过 yield 关键字提前将当前符合条件的 T 值返回给调用方然后返回到之前执行的地方继续查找符合条件的元素.

使用方式

1. 通过 GetEnumerator() 方法访问成员元素

IEnumerable和IEnumerable<T>接口提供了GetEnumerator()方法让我们获取迭代器,通过MoveNext()方法返回的bool值提供是否可以进行下一次迭代,然后通过Current属性获取当前元素.


    // 快速生成0-100, Enumerable 提供了很多方便的静态方法
    IEnumerable<int> arr = Enumerable.Range(0, 100);

    var enumerator = arr.GetEnumerator();

    while(enumerator.MoveNext())
    {
        enumerator.Current.Dump();
    }

2. 通过 foreach 关键字快速遍历成员元素

foreach关键字提供了快速遍历成员元素的操作,其也是通过生成第一个例子的代码迭代,省去了反复书写冗余代码的步骤.


    IEnumerable<int> arr = Enumerable.Range(0, 100);    
    
    // 遍历打印成员
    foreach (int element in arr)
    {
        Console.WriteLine(arr.ToString());  
    }

3. 作为同步方法返回值时通过 yield 关键字即时返回成员

当使用IEnumerable<T>作为同步方法的返回值时,我们可以对外隐藏返回值具体的实现,比如List<T> 实现了IEnumerable<T>,Dictionary<TKey,TValue>实现了IEnumerable<KeyValuePair<TKey,TValue>>.

当需要返回值时,方法内可以是一个整体结果返回,也可以利用yield关键字逐个成员结果返回.


    public void Main(string[] args)
    {
        // 通过IEnumerable<char> 逐个char 打印
        foreach (var task in GetTasksFromIEnumerable(5))
        {
            Console.WriteLine(task);
            Console.WriteLine($"处理完:{task}");
        }

        IEnumerable<int> GetTasksFromIEnumerable(int count)
        {
            for (int i = 0; i < count; i++)
            {
                yield return HeavyTask(i);
                Console.WriteLine($"已返回当前值:{i},准备下一次");
            }
        }

        // 模拟比较重的任务
        int HeavyTask(int i)
        {
            // 模拟耗时
            Thread.Sleep(1000);

            return i;
        }
    }

以上代码我们可以得到以下输出,可以看到每次调用方当前循环体结束后,迭代器又会回到当前运行的地方准备执行下一次迭代;

0
处理完:0
已返回当前值:0,准备下一次
1
处理完:1
已返回当前值:1,准备下一次
2
处理完:2
已返回当前值:2,准备下一次
3
处理完:3
已返回当前值:3,准备下一次
4
处理完:4
已返回当前值:4,准备下一次

4. 作为异步方法返回值时通过 yield 关键字即时返回成员

在如今异步方法大行其道的今天,我们的实际使用中异步方法已经稀疏平常了,但 C# 中的异步方法关键字 async , await 具有传染性,只有我们方法中使用到了异步方法并希望使用 await 等待结果的时候当前的方法必须使用 async 关键字标记并且将返回值使用 Task<T> 包裹.所以,通过正常途径我们无法获得一个只返回 IEnumerable<T> 结果的异步方法,因为它始终被 Task 包裹,除非我们在方法中等待所有的结果完成后作为异步方法的结果返回,但显然这不是我们希望的结果.那么我们如何才能希望和同步方法中一样即时返回当前的结果且不阻塞呢? 答案是使用它的异步类型接口 IAsyncEnumerable<T>.

可以使整个结果返回,无法将单个结果即时返回

    public async Task<IEnumerable<int>> GetNumbersAsync()
    {
        // 模拟需要执行的异步任务
        await Task.Delay(1000);

        var result = Enumerable.Range(0, 100);

        return result; //  ✔ 返回整个结果
    }
    public async Task<IEnumerable<int>> GetNumbersAsync()
    {
        for(int i = 0; i < 5 ; i++ )
        {
            yield return await GetSignleNumberAsync(); //  ❌ 编译错误 

            //CS1624: The body of 'GetNumbersAsync()' cannot be an iterator block because 'Task<IEnumerable<int>>' is not an iterator interface type
        }
    }

5. IAsyncEnumerable<T>

当使用 IAsyncEnumerable<T> 时异步方法的返回值可以直接使用它作为返回值的类型例如


    public async Task Main(string[]args)
    {
        Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}");

        // 通过await foreach 立即进行迭代
        await foreach (var number in GetNumbersAsync())
        {
            Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}");
            Console.WriteLine(number);
        }
    }

    async IAsyncEnumerable<int> GetNumbersAsync()
    {
        for (int i = 0; i < 5; i++)
        {
            yield return await GetSignleNumberAsync(); //  ✔ 编译通过
        }
    }

    async Task<int> GetSignleNumberAsync()
    {
        // 模拟耗时
        await Task.Delay(1000);

        return Random.Shared.Next();
    }

得到输出结果

当前线程:1
当前线程:6
809282356
当前线程:6
696341357
当前线程:6
872147671
当前线程:6
791323674
当前线程:6
1961595625
当前线程:6

我们也可以通过 ToBlockingEnumerable() 方法将对应的 IAsyncEnumerable<int> 的结果转为同步阻塞的 IEnumerable<T>


// 通过 ToBlockingEnumerable 转为同步阻塞的 IEnumerable<T>
var result = GetNumbersAsync().ToBlockingEnumerable();

// 将以同步代码执行
Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}");
foreach (var element in result)
{ 
    Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}");
    Console.WriteLine(element);
}

得到以下输出结果

当前线程:1
当前线程:1
1933649614
当前线程:1
1975509029
当前线程:1
1303323564
当前线程:1
1618007076
当前线程:1
503278324

IEnumerable

我们可以通过 sharplab.io 这个网站来看看 通过 yield + foreach 关键字为我们生成最终的代码的样子

源代码

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class C 
{
    public void M() 
    {
        foreach(var item in GetTasksFromIEnumerable(15))
        {
            Console.WriteLine(item);
        }
    }
    
    IEnumerable<int> GetTasksFromIEnumerable(int count)
    {
        for (int i = 0; i < count; i++)
        {
            yield return HeavyTask(i);
            Console.WriteLine($"已返回当前值:{i},准备下一次");
        }
    }

    // 模拟比较重的任务
    int HeavyTask(int i)
    {
        // 模拟耗时
        Thread.Sleep(1000);

        return i;
    }
}

生成后的代码


// 省略部分无关代码 
public class C
{
    [CompilerGenerated]
    private sealed class <GetTasksFromIEnumerable>d__1 : IEnumerable<int>, IEnumerable, IEnumerator<int>, IEnumerator, IDisposable
    {
        private int <>1__state;

        private int <>2__current;

        private int <>l__initialThreadId;

        private int count;

        public int <>3__count;

        public C <>4__this;

        private int <i>5__1;

        int IEnumerator<int>.Current
        {
            [DebuggerHidden]
            get
            {
                return <>2__current;
            }
        }

        object IEnumerator.Current
        {
            [DebuggerHidden]
            get
            {
                return <>2__current;
            }
        }

        [DebuggerHidden]
        public <GetTasksFromIEnumerable>d__1(int <>1__state)
        {
            this.<>1__state = <>1__state;
            <>l__initialThreadId = Environment.CurrentManagedThreadId;
        }

        [DebuggerHidden]
        void IDisposable.Dispose()
        {
        }

        private bool MoveNext()
        {
            int num = <>1__state;
            if (num != 0)
            {
                if (num != 1)
                {
                    return false;
                }
                <>1__state = -1;
                DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(13, 1);
                defaultInterpolatedStringHandler.AppendLiteral("已返回当前值:");
                defaultInterpolatedStringHandler.AppendFormatted(<i>5__1);
                defaultInterpolatedStringHandler.AppendLiteral(",准备下一次");
                Console.WriteLine(defaultInterpolatedStringHandler.ToStringAndClear());
                <i>5__1++;
            }
            else
            {
                <>1__state = -1;
                <i>5__1 = 0;
            }
            if (<i>5__1 < count)
            {
                <>2__current = <>4__this.HeavyTask(<i>5__1);
                <>1__state = 1;
                return true;
            }
            return false;
        }

        bool IEnumerator.MoveNext()
        {
            //ILSpy generated this explicit interface implementation from .override directive in MoveNext
            return this.MoveNext();
        }

        [DebuggerHidden]
        void IEnumerator.Reset()
        {
            throw new NotSupportedException();
        }

        [DebuggerHidden]
        [return: System.Runtime.CompilerServices.Nullable(1)]
        IEnumerator<int> IEnumerable<int>.GetEnumerator()
        {
            <GetTasksFromIEnumerable>d__1 <GetTasksFromIEnumerable>d__;
            if (<>1__state == -2 && <>l__initialThreadId == Environment.CurrentManagedThreadId)
            {
                <>1__state = 0;
                <GetTasksFromIEnumerable>d__ = this;
            }
            else
            {
                <GetTasksFromIEnumerable>d__ = new <GetTasksFromIEnumerable>d__1(0);
                <GetTasksFromIEnumerable>d__.<>4__this = <>4__this;
            }
            <GetTasksFromIEnumerable>d__.count = <>3__count;
            return <GetTasksFromIEnumerable>d__;
        }

        [DebuggerHidden]
        [return: System.Runtime.CompilerServices.Nullable(1)]
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<int>)this).GetEnumerator();
        }
    }

    public void M()
    {
        IEnumerator<int> enumerator = GetTasksFromIEnumerable(15).GetEnumerator();
        try
        {
            while (enumerator.MoveNext())
            {
                int current = enumerator.Current;
                Console.WriteLine(current);
            }
        }
        finally
        {
            if (enumerator != null)
            {
                enumerator.Dispose();
            }
        }
    }

    [System.Runtime.CompilerServices.NullableContext(1)]
    [IteratorStateMachine(typeof(<GetTasksFromIEnumerable>d__1))]
    private IEnumerable<int> GetTasksFromIEnumerable(int count)
    {
        <GetTasksFromIEnumerable>d__1 <GetTasksFromIEnumerable>d__ = new <GetTasksFromIEnumerable>d__1(-2);
        <GetTasksFromIEnumerable>d__.<>4__this = this;
        <GetTasksFromIEnumerable>d__.<>3__count = count;
        return <GetTasksFromIEnumerable>d__;
    }

    private int HeavyTask(int i)
    {
        Thread.Sleep(1000);
        return i;
    }
}

// 省略部分无关代码 

  1. 在 调用方 M() 方法中 foreach 关键字 为我们生成了通过GetTasksFromIEnumerable().GetEnumerator() 方法返回的 IEnumerator<int> 类型的结果 的迭代器 ,然后通过try-finally 包裹了原来 forech 中的方法块 finally 最终会释放获取到的迭代器.

  2. GetTasksFromIEnumerable() 方法中为我们生成了一个状态机 <GetTasksFromIEnumerable>d__1 初始化状态为 -2 ,然后将 当前所处的实例 this 和 入参 count 作为字段

  3. 通过 <GetTasksFromIEnumerable>d__1 中的 IEnumerable<int>.GetEnumerator() 方法实现该状态机的初始化,其中还包含了对调用方线程与迭代器初始化线程是否一致的判断,如果不一致的话会将其重置为当前线程.

  4. 然后通过 MoveNext 不断获取当前迭代的值 ,可以看到原来的

    yield return HeavyTask(i); 
    

    转化成了

     if (<i>5__1 < count) // 原来条件
     {
         <>2__current = <>4__this.HeavyTask(<i>5__1);
         <>1__state = 1; // 将 state 标记为 1, 使其走到上面对应的 if 语句
         return true; // 并表示可以继续移动
     }
     return false; // 结束
    

    state 改变为 1 之后 , 执行原 yield 后的代码块

    if (num != 0)
    {
    
        if (num != 1)
        {
            return false;
        }
    
        // 重新标记为 -1
        <>1__state = -1;
    
        // 对应原来的 Console.WriteLine($"已返回当前值:{i},准备下一次");
        DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(13, 1);
        defaultInterpolatedStringHandler.AppendLiteral("已返回当前值:");
        defaultInterpolatedStringHandler.AppendFormatted(<i>5__1);
        defaultInterpolatedStringHandler.AppendLiteral(",准备下一次");
        Console.WriteLine(defaultInterpolatedStringHandler.ToStringAndClear());
        
        // 循环遍历累加
        <i>5__1++;
    }
    else
    {
        <>1__state = -1;
        // 这里为啥会重置为 0 ?
        <i>5__1 = 0;
    }
    
    

问题

上面说了为了这碟醋包了这顿饺子,那么这顿饺子是什么呢?

其实后面发现不是 IEnumerable 或者IAsyncEnumerable 的问题 而是对于异步中对象的生命周期的理解问题.

之前再写一个解析网页元素项的辅助方法时,本着能少写一个少写一个的原则(哈哈哈,偷懒),想将传入的 html 字符串转成流 然后调用另一个写好的 Stream 解析的函数.


/// 偷懒的函数
public static IAsyncEnumerable<TTableRow> ParseSimpleTable<TTableRow>(string html, string tableSelector, string rowSelector, Func<IElement, ValueTask<TTableRow>> rowParseFunc)
{
    // 出于直觉 在这里 using 
    using MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(html));

    return ParseSimpleTable(stream, tableSelector, rowSelector, rowParseFunc);
}

/// <summary>
/// 解析简单表格
/// </summary>
/// <typeparam name="TTableRow">解析结果项</typeparam>
/// <param name="stream">要解析的流</param>
/// <param name="tableSelector">table选择器</param>
/// <param name="rowSelector">行选择器</param>
/// <param name="rowParseFunc">行解析方法委托</param>
/// <returns></returns>
/// <exception cref="ArgumentException"></exception>
public static async IAsyncEnumerable<TTableRow> ParseSimpleTable<TTableRow>(Stream stream, string tableSelector, string rowSelector, Func<IElement, ValueTask<TTableRow>> rowParseFunc)
{
    IBrowsingContext browsingContext = BrowsingContext.New();

    var htmlParser = browsingContext.GetService<IHtmlParser>();

    if (htmlParser == null)
        throw new ArgumentException(nameof(htmlParser));

    using IDocument document = await htmlParser.ParseDocumentAsync(stream);

    var tableElement = document.QuerySelector(tableSelector);
    if (tableElement == null)
        yield break;

    var rowsElement = tableElement.QuerySelectorAll(rowSelector);
    if (rowsElement == null || !rowsElement.Any())
        yield break;

    foreach (var rowElement in rowsElement)
    {
        yield return await rowParseFunc(rowElement);
    }
}

由于出于直觉的 using 了这个流,下意识的以为这个 Stream 会在这个函数执行后释放, 然后就...异常了

Cannot access a closed Stream.
  Data = <enumerable Count: 0>
  HelpLink = <null>
  HResult = -2146232798
  InnerException = <null>
  Message = Cannot access a closed Stream.
  ObjectName = 
  Source = System.Private.CoreLib
  StackTrace =    at System.IO.MemoryStream.get_Length()
   at Program.<<Main>$>g__GetBytes|0_1(Stream stream)+MoveNext() in :line 20
   at Program.<<Main>$>g__GetBytes|0_1(Stream stream)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at Program.<Main>$(String[] args) in :line 3
   at Program.<Main>$(String[] args) in :line 3
   at Program.<Main>(String[] args)
  TargetSite = Void ThrowObjectDisposedException_StreamClosed(System.String)

一般流报这个异常都是被提前释放的问题,我一想噢应该时异步的问题,然后我去看生成后的代码,恍然大悟.


// 模拟场景 

 private IAsyncEnumerable<byte> ParseSimpleTable<TTableRow>(string s)
    {
        MemoryStream memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(s));
        try
        {
            // 这里是一个异步方法,但是我并没有等待完成,而是转交给了调用方等待
            return ParseSimpleTable(memoryStream);
        }
        finally
        {
            if (memoryStream != null)
            {
                // 没有等待所以这里 memoryStream 被释放了 ,但是 GetBytes 方法还在执行
                ((IDisposable)memoryStream).Dispose();
            }
        }
    }

生成后的代码 一目了然,memoryStream 被提前释放了.

解决错误方式很简单

  1. 等待完成 await ParseSimpleTable 后释放,在当前方法块中等待完成,但是无法直接返回 IAsyncEnumerable了,必须配合 yield 关键字

  2. 在最终调用 Stream 的函数中 using 或 调用 Close() ,也就是在具体 yield 方法块之后调用 ,但是在最底层释放来自调用方的流感觉有点怪怪的(不排除调用方的流还要重用...这里给他关闭了就会显得坑!)

  3. 不偷懒了,手动写一个 基于 string html 解析的函数(哈哈),就没有上述问题了,也避免了重复创建流对象的问题(滑稽).

总结

在异步中使用一些需要释放的资源的时候需要注意对象的生命周期,不然可能造成内存泄漏或者代码异常.
尤其是编写一些底层一点点的代码时,往往为了优化而不会同步等待资源到位,而是通过异步的方式访问,这个时候关注对象的生命周期就显得尤为重要了.
12-15 10:54