IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 开发工具 -> C# DataFlow入门(二、属性配置) -> 正文阅读

[开发工具]C# DataFlow入门(二、属性配置)

   #region 一、DataflowBlockOptions 
        /***
         *  1. BoundedCapacity 最大处理数目 设置容量
         *  2. CancellationToken 用于提前取消流处理
         *  3. MaxMessagesPerTask 控制公平性 多个源,每个源处理相同的任务
         *  4. NameFormat 定义Block的名称
         *  5. TaskScheduler 任务调用逻辑
         */
        // 1.BoundedCapacity
        private BufferBlock<int> b1 = new BufferBlock<int>(new DataflowBlockOptions() {
            BoundedCapacity = 1
        });
        public void TestBoundedCapacity() {
            for (int i = 0; i < 10; i++) {
                b1.Post(i);
            }
            while (!b1.Completion.IsCompleted) {
                Console.WriteLine(b1.Receive());
            }
        }

        // 2. CancellationToken
        private static CancellationTokenSource cts = new CancellationTokenSource();

        private BufferBlock<int> b2 = new BufferBlock<int>(new DataflowBlockOptions()
        {
            CancellationToken = cts.Token
        });
        public void TestCancellationToken()
        {
            Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i++)
                {
                    Thread.Sleep(1000);
                    b2.Post(i);
                    if (i == 5)
                    {
                        cts.Cancel();
                    }
                }
            });
            Task.Factory.StartNew(() =>
            {
                while (!b2.Completion.IsCompleted)
                {
                    Thread.Sleep(500);
                    if (cts.IsCancellationRequested)
                    {
                        Console.WriteLine("已取消");
                        break;
                    }
                    Console.WriteLine(b2.Receive());
                }
            });

        }

        // 3. MaxMessagesPerTask
        BufferBlock<int> b3 = new BufferBlock<int>();
        BufferBlock<int> b4 = new BufferBlock<int>();
        BufferBlock<int> b5 = new BufferBlock<int>();
        BufferBlock<int> b6 = new BufferBlock<int>(new DataflowBlockOptions() { MaxMessagesPerTask = 1 });

        public void TestMaxMessagesPerTask()
        {
            b3.LinkTo(b6);
            b4.LinkTo(b6);
            b5.LinkTo(b6);
            Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 5; i++) {
                    Thread.Sleep(50);
                    b3.Post(i);
                }
            });

            Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 5; i++)
                {
                    Thread.Sleep(50);
                    b4.Post(i + 10);
                }
            });

            Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 5; i++)
                {
                    Thread.Sleep(50);
                    b5.Post(i + 100);
                }
            });

            Task.Factory.StartNew(() =>
            {
                while (!b6.Completion.IsCompleted)
                {
                    Console.WriteLine(b6.Receive());
                }
            });


        }

        // 4. NameFormat
        static string name = "b7";
        BufferBlock<int> b7 = new BufferBlock<int>(new DataflowBlockOptions() { NameFormat = name });

        public void TestNameFormat() {
            for (int i = 0; i < 10; i++) {
                b7.Post(i);
            }
            while (!b7.Completion.IsCompleted) {

                Console.WriteLine("name:" + b7.ToString() + " id:" + b7.Completion.Id + " value:" + b7.Receive());
            }
        }

        // 5. TaskScheduler
        /**
         * ThreadPoolTaskScheduler 默认的
         * SynchronizationContextTaskScheduler UI建议
         * ConcurrentExclusiveSchedulerPair 同步/独占
         */
        public void TestTaskScheduler()
        {
            ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair();
            BufferBlock<int> b8 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
            BufferBlock<int> b9 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
            BufferBlock<int> b10 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });

            BufferBlock<int> b11 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
            b8.LinkTo(b11);
            b9.LinkTo(b11);
            b10.LinkTo(b11);
            for (int i = 0; i < 3; i++)
            {
                b8.Post(i);
            }
            for (int i = 0; i < 3; i++)
            {
                b9.Post(i + 10);
            }
            for (int i = 0; i < 3; i++)
            {
                b10.Post(i + 100);
            }
            while (!b11.Completion.IsCompleted)
            {

                Console.WriteLine(" value:" + b11.Receive());
            }
        }

        #endregion
        #region 二、ExecutionDataflowBlockOptions 
        /****
         * DataflowBlockOptions的派生类
         *  1. BoundedCapacity 最大处理数目 设置容量
         *  2. CancellationToken 用于提前取消流处理
         *  3. MaxMessagesPerTask 控制公平性 多个源,每个源处理相同的任务(理论上是)
         *  4. NameFormat 定义Block的名称
         *  5. TaskScheduler 任务调用逻辑
         *  
         *  以上均与DataflowBlockOptions相同,不再赘述
         *  
         *  6. EnsureOrdered 默认值为false,其是用来将输出结果进行排序的
         *  7. MaxDegreeOfParallelism 最大并行任务数
         *  8. SingleProducerConstrained 还不是很清楚
         */

        TransformBlock<int, string> TransformBlock = new TransformBlock<int, string>((i) => {
            Thread.Sleep(1000);
            return "value:" + i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now;
        }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 5, EnsureOrdered = true });
        ActionBlock<string> action = new ActionBlock<string>((i) => {
            Console.WriteLine(i);
        });

        public void TestEnsureOrderedAndMaxDegreeOfParallelism() {
            TransformBlock.LinkTo(action);
            for (int i = 0; i < 50; i++) {
                TransformBlock.Post(i);
            }
        }

        ActionBlock<int> ab10 = new ActionBlock<int>((i) =>
        {
            Console.WriteLine(i);
        },new ExecutionDataflowBlockOptions() { SingleProducerConstrained = true});
        public void TestSingleProducerConstrained() {
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 5000; i++)
                {
                    ab10.Post(i);
                }
                Console.WriteLine("1 Finished post "+ stopwatch.ElapsedMilliseconds);
                ab10.Complete();
            });

            while (!ab10.Completion.IsCompleted) { 

            }
            Console.WriteLine("1 IsCompleted " + stopwatch.ElapsedMilliseconds);
        }
        #endregion
        #region 三、GroupingDataflowBlockOptions 
        /****
         * DataflowBlockOptions的派生类
         *  1. BoundedCapacity 最大处理数目 设置容量
         *  2. CancellationToken 用于提前取消流处理
         *  3. MaxMessagesPerTask 控制公平性 多个源,每个源处理相同的任务
         *  4. NameFormat 定义Block的名称
         *  5. TaskScheduler 任务调用逻辑
         *  6. EnsureOrdered 默认值为false,其是用来将输出结果进行排序的
         *  以上均与DataflowBlockOptions相同,不再赘述
         *  
         *  
         *  7. Greedy 贪婪,限制msg使用
         *  8. MaxNumberOfGroups 最大组数。限制输出的组数
         */

        BatchBlock<int> bb13 = new BatchBlock<int>(3,new GroupingDataflowBlockOptions() { Greedy=false});
        BatchBlock<int> bb14 = new BatchBlock<int>(3, new GroupingDataflowBlockOptions() { Greedy = true,MaxNumberOfGroups = 2 });

        ActionBlock<int[]> ab13 = new ActionBlock<int[]>((i) =>
        {
            string s = string.Empty;

            foreach (int m in i)
            {
                s += m + " ";
            }
            Console.WriteLine(s);
        });

        public void TestGreedy()
        {
            bb13.LinkTo(ab13);
            bb14.LinkTo(ab13);
            for (int i = 0; i < 10; i++)
            {
                bb13.Post(i);
                bb14.Post(i+100);
            }
            bb13.Complete();

            Console.WriteLine("Finished post");
        }
        #endregion

  开发工具 最新文章
Postman接口测试之Mock快速入门
ASCII码空格替换查表_最全ASCII码对照表0-2
如何使用 ssh 建立 socks 代理
Typora配合PicGo阿里云图床配置
SoapUI、Jmeter、Postman三种接口测试工具的
github用相对路径显示图片_GitHub 中 readm
Windows编译g2o及其g2o viewer
解决jupyter notebook无法连接/ jupyter连接
Git恢复到之前版本
VScode常用快捷键
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:56:36  更:2022-05-09 12:58:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/14 14:52:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码