#region 一、DataflowBlockOptions
private BufferBlock<int> b1 = new BufferBlock<int>(new DataflowBlockOptions() {
BoundedCapacity = 1
});
public void TestBoundedCapacity() {
for (int i = 0; i < 10; i++) {
b1.Post(i);
}
while (!b1.Completion.IsCompleted) {
Console.WriteLine(b1.Receive());
}
}
private static CancellationTokenSource cts = new CancellationTokenSource();
private BufferBlock<int> b2 = new BufferBlock<int>(new DataflowBlockOptions()
{
CancellationToken = cts.Token
});
public void TestCancellationToken()
{
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
Thread.Sleep(1000);
b2.Post(i);
if (i == 5)
{
cts.Cancel();
}
}
});
Task.Factory.StartNew(() =>
{
while (!b2.Completion.IsCompleted)
{
Thread.Sleep(500);
if (cts.IsCancellationRequested)
{
Console.WriteLine("已取消");
break;
}
Console.WriteLine(b2.Receive());
}
});
}
BufferBlock<int> b3 = new BufferBlock<int>();
BufferBlock<int> b4 = new BufferBlock<int>();
BufferBlock<int> b5 = new BufferBlock<int>();
BufferBlock<int> b6 = new BufferBlock<int>(new DataflowBlockOptions() { MaxMessagesPerTask = 1 });
public void TestMaxMessagesPerTask()
{
b3.LinkTo(b6);
b4.LinkTo(b6);
b5.LinkTo(b6);
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 5; i++) {
Thread.Sleep(50);
b3.Post(i);
}
});
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(50);
b4.Post(i + 10);
}
});
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(50);
b5.Post(i + 100);
}
});
Task.Factory.StartNew(() =>
{
while (!b6.Completion.IsCompleted)
{
Console.WriteLine(b6.Receive());
}
});
}
static string name = "b7";
BufferBlock<int> b7 = new BufferBlock<int>(new DataflowBlockOptions() { NameFormat = name });
public void TestNameFormat() {
for (int i = 0; i < 10; i++) {
b7.Post(i);
}
while (!b7.Completion.IsCompleted) {
Console.WriteLine("name:" + b7.ToString() + " id:" + b7.Completion.Id + " value:" + b7.Receive());
}
}
public void TestTaskScheduler()
{
ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair();
BufferBlock<int> b8 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
BufferBlock<int> b9 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
BufferBlock<int> b10 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
BufferBlock<int> b11 = new BufferBlock<int>(new DataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler });
b8.LinkTo(b11);
b9.LinkTo(b11);
b10.LinkTo(b11);
for (int i = 0; i < 3; i++)
{
b8.Post(i);
}
for (int i = 0; i < 3; i++)
{
b9.Post(i + 10);
}
for (int i = 0; i < 3; i++)
{
b10.Post(i + 100);
}
while (!b11.Completion.IsCompleted)
{
Console.WriteLine(" value:" + b11.Receive());
}
}
#endregion
#region 二、ExecutionDataflowBlockOptions
TransformBlock<int, string> TransformBlock = new TransformBlock<int, string>((i) => {
Thread.Sleep(1000);
return "value:" + i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 5, EnsureOrdered = true });
ActionBlock<string> action = new ActionBlock<string>((i) => {
Console.WriteLine(i);
});
public void TestEnsureOrderedAndMaxDegreeOfParallelism() {
TransformBlock.LinkTo(action);
for (int i = 0; i < 50; i++) {
TransformBlock.Post(i);
}
}
ActionBlock<int> ab10 = new ActionBlock<int>((i) =>
{
Console.WriteLine(i);
},new ExecutionDataflowBlockOptions() { SingleProducerConstrained = true});
public void TestSingleProducerConstrained() {
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
Task.Factory.StartNew(() =>
{
for (int i = 0; i < 5000; i++)
{
ab10.Post(i);
}
Console.WriteLine("1 Finished post "+ stopwatch.ElapsedMilliseconds);
ab10.Complete();
});
while (!ab10.Completion.IsCompleted) {
}
Console.WriteLine("1 IsCompleted " + stopwatch.ElapsedMilliseconds);
}
#endregion
#region 三、GroupingDataflowBlockOptions
BatchBlock<int> bb13 = new BatchBlock<int>(3,new GroupingDataflowBlockOptions() { Greedy=false});
BatchBlock<int> bb14 = new BatchBlock<int>(3, new GroupingDataflowBlockOptions() { Greedy = true,MaxNumberOfGroups = 2 });
ActionBlock<int[]> ab13 = new ActionBlock<int[]>((i) =>
{
string s = string.Empty;
foreach (int m in i)
{
s += m + " ";
}
Console.WriteLine(s);
});
public void TestGreedy()
{
bb13.LinkTo(ab13);
bb14.LinkTo(ab13);
for (int i = 0; i < 10; i++)
{
bb13.Post(i);
bb14.Post(i+100);
}
bb13.Complete();
Console.WriteLine("Finished post");
}
#endregion
|