本文解决的问题如上所示,Datagram(数据报)类型的 Socket,这包含 “ICMP、UDP”、“RAW IP”等,那么为什么会发生这样的问题?
我们在阻塞模式下是不存在这个问题,但本文描述的问题仅仅包含在以下几个 Socket 异步接口实现上面。
以下为发生故障的 .NET API:
1、System::Net::Sockets::Socket::BeginSendTo
2、System::Net::Sockets::Socket::BeginReceiveFrom
3、System::Net::Sockets::Socket::EndSendTo
4、System::Net::Sockets::Socket::EndReceiveFrom
5、System::Net::Sockets::Socket::SendToAsync
6、System::Net::Sockets::Socket::ReceiveFromAsync
总结:
于 Windows 平台上而言,则是 .NET?基于 CompletionRoutine(完成例程)、IoCompletionPort(完成端口)实现导致的问题。
于 Linux 平台上而言,则是 .NET 基于 libuv(epoll)实现导致的问题。
但这东西可以说它并不是 BUG,但仅仅只是造成大量不必要的内存被分配,挤占系统硬件资源(服务器上是很忌讳的),C/C++ 直接利用 IOCP、EPOLL 实现多线程的工作队列仍旧有很多人会造成这样的问题存在,比如:把所有的需要 Completion Context 投递到 ROOT IoCP?or epoll,所有完成工作线程从 ROOT 上等待,操作系统触发事件到达,并不是固定某个特定的线程的,多个线程等待相同的 handle,触发可能是等待的任意一个线程。
问题:(源于)
为了实现真正高效的,事件驱动状态机(EDSM),并且提供与多个平台相近的API实现,每个事件(IoCompletionContext)被投递到 .NET ThreadPool(完成事件上下文工作线程池)进行驱动,Windows 平台基于 IOCP/IOCR 模型进行驱动,Linux 由 epoll 模型进行驱动,它类似 C/C++ boost::asio 的 boost::asio::io_context(io_service)IO完成队列驱动上下文,但与 boost::asio 框架不同的是,每个 Socket 实例的每次 “完成事务、Completion Transaction” 可以由 .NET 完成队列线程池?dispatch,而 boost::asio,仅仅只允许最终由每个 asio 确切的具体对象构造时指定的 Executor(io_context)?进行驱动,即不存在 .NET Implement,可能导致的,由A线程执行可变为A线程自身直接完成,或由B/C/D任意一个线程进行完成。
而对于一个 MT(多线程)架构的 Datagram 网络应用程序而言,我们不需要为每个 Socket 都需要分配一个有效的内存缓冲区(RAM Buffer),仅仅只需要为每个线程分配固态缓冲区 64KB(65535字节)即可,纵然是分配此缓冲区大小仍旧存在 RAM 空间浪费(C/C++ 建议?64KB 尽量按照粒度分配,.NET 程式分配器自行对齐)
准确需要按照不同的网络来区别需要多大的缓冲区,例如 UDP 协议则应最大缓冲区大小为:65535 - sizeof(udp_hdr, 8),但借助 .NET 运行时提供的 Socket 是无法办到的,所以我们必须要解决这个问题,每个 UDP Dgram-socket 都需分配 64K 内存,那么 1G RAM 满打满算顶多只能分配 1.6W 个左右缓冲区,然而现实情况是根本不可能,能够分配几千个那都是烧了高香,因为你不可以单纯只算你具体分配了多少,.NET 一个托管对象实例需要占用很多无用户内存的。
#pragma pack(push, 1) ? ? ? ? ? ? struct udp_hdr { ? ? ? ? ? ? public: ? ? ? ? ? ? ? ? unsigned short ? ? ? ? ? ? ? ? ?src; ? ? ? ? ? ? ? ? unsigned short ? ? ? ? ? ? ? ? ?dest; ?/* src/dest UDP ports */ ? ? ? ? ? ? ? ? unsigned short ? ? ? ? ? ? ? ? ?len; ? ? ? ? ? ? ? ? unsigned short ? ? ? ? ? ? ? ? ?chksum;
? ? ? ? ? ? public: ? ? ? ? ? ? ? ? static struct udp_hdr* ? ? ? ? ?Parse(struct ip_hdr* iphdr, const void* packet, int size); ? ? ? ? ? ? }; #pragma pack(pop)
但 TCP/IP 类型的 Stream Socket 是不建议为分个线程都分配固态缓冲区的,理由为每个操作系统都为 TCP/IP 进行过单独的吞吐优化,如果一个线程上多个 TCP Socket 使用相同 RAM 地址缓冲区数据,那么这会导致严重的 TCP/IP Socket 网络IO的收发(Received[RX]、Sent[TX])吞吐性能严重降速,这不是框架问题,而是系统层实现问题,大量的SYN速度能快起来那就有问题了,强行解决办法则是抛弃系统层实现的易于应用层用户调用 TCP Socket,自主实现或引入一个 3rd?TCP/IP 网络协议栈。
?那么既然已知道有这么个疑难问题,处理上就很简单的,本人基于 boost::asio 封装了一个可用的?C/C++/C# 库代码,看到本贴文内容的小伙伴,可自行通过其它方法自行实现或摘要本文提供的处理代码。
Usage:
[MTAThread]
private static void Main(string[] args)
{
IPAddress interfaceIP = IPAddress.Parse("192.168.0.24");
AsyncScheduler scheduler = new AsyncScheduler(1);
AsyncContext context = scheduler.GetContext();
AsyncSocket socket = context.CreateSocket(new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp));
socket.Socket.Bind(new IPEndPoint(interfaceIP, 0));
byte[] buffer = context.Buffer;
socket.SendTo(new byte[] { (byte)'1', (byte)'2', (byte)'3' }, 0, 3, new IPEndPoint(interfaceIP, 7000), null);
socket.ReceiveFrom(buffer, 0, buffer.Length, (bytesTransferred, sourceEP) =>
{
string RX = Encoding.UTF8.GetString(buffer, 0, Math.Max(bytesTransferred, 0));
Console.WriteLine($"bytesTransferred[{bytesTransferred}] RX[{RX}]");
});
Console.ReadKey(false);
}
C/C++ Dlllibrary Implement:?
#include <stdio.h>
#include <signal.h>
#include <limits.h>
#ifdef _WIN32
#include <WinSock2.h>
#include <Windows.h>
#pragma comment(lib, "Ws2_32.lib")
#else
#include <pthread.h>
#include <sched.h>
#include <sys/resource.h>
#include <sys/file.h>
#endif
#include <unordered_map>
#include <functional>
#include <thread>
#include <boost/asio.hpp>
#ifndef LIBASIO_API
#ifdef __cplusplus
#ifdef _WIN32
#define LIBASIO_API extern "C" __declspec(dllexport)
#else
#define LIBASIO_API extern "C" __attribute__((visibility("default")))
#endif
#else
#define LIBASIO_API
#endif
#endif
typedef struct {
uint32_t v4_or_v6_;
union {
struct {
uint32_t address_;
uint32_t port_;
} in4_;
struct {
char address_[16];
uint32_t port_;
} in6_;
char data_[20];
};
} libasio_endpoint;
typedef void(*libasio_post_callback)(void* context_, uint64_t key_);
typedef void(*libasio_sendto_callback)(void* socket_, uint64_t key_, int length_);
typedef void(*libasio_recvfrom_callback)(void* socket_, uint64_t key_, int length_, libasio_endpoint* remoteEP_);
typedef std::shared_ptr<boost::asio::ip::udp::socket> libasio_socket;
typedef std::shared_ptr<boost::asio::io_context> libasio_context;
typedef std::unordered_map<boost::asio::io_context*, libasio_context> libasio_context_hashtable;
typedef std::unordered_map<boost::asio::ip::udp::socket*, libasio_socket> libasio_socket_hashtable;
typedef std::unordered_map<boost::asio::ip::udp::socket*, libasio_context> libasio_so2ctx;
static libasio_so2ctx _so2ctxs_;
static libasio_socket_hashtable _sockets_;
static libasio_context_hashtable _contexts_;
static std::mutex _syncobj_;
#define __lock__(obj) \
do {\
std::lock_guard<std::mutex> scoped_(obj);
#define __unlock__ \
} while(0);
static libasio_context libasio_getcontext(boost::asio::io_context* context_) {
libasio_context_hashtable::iterator tail = _contexts_.find(context_);
libasio_context_hashtable::iterator endl = _contexts_.end();
if (tail == endl) {
return NULL;
}
return tail->second;
}
static libasio_context libasio_getcontext(boost::asio::ip::udp::socket* socket_) {
libasio_so2ctx::iterator tail = _so2ctxs_.find(socket_);
libasio_so2ctx::iterator endl = _so2ctxs_.end();
if (tail == endl) {
return NULL;
}
return tail->second;
}
static libasio_socket libasio_getsocket(boost::asio::ip::udp::socket* socket_) {
libasio_socket_hashtable::iterator tail = _sockets_.find(socket_);
libasio_socket_hashtable::iterator endl = _sockets_.end();
if (tail == endl) {
return NULL;
}
return tail->second;
}
LIBASIO_API
boost::asio::io_context* libasio_newcontext() {
std::shared_ptr<boost::asio::io_context> context_ = std::make_shared<boost::asio::io_context>();
std::thread([context_] {
#ifdef _WIN32
SetThreadPriority(GetCurrentProcess(), THREAD_PRIORITY_HIGHEST);
#else
/* ps -eo state,uid,pid,ppid,rtprio,time,comm */
struct sched_param param_;
param_.sched_priority = sched_get_priority_max(SCHED_FIFO); // SCHED_RR
pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m_);
#endif
boost::asio::io_context::work work_(*context_);
boost::system::error_code ec_;
context_->run(ec_);
__lock__(_syncobj_){
std::shared_ptr<boost::asio::io_context> p = libasio_getcontext(context_.get());
if (p) {
_contexts_.erase(context_.get());
}
} __unlock__;
}).detach();
boost::asio::io_context* p = context_.get();
__lock__(_syncobj_) {
_contexts_[p] = std::move(context_);
} __unlock__;
return p;
}
LIBASIO_API
void libasio_closecontext(boost::asio::io_context* context_) {
if (!context_) {
return;
}
__lock__(_syncobj_) {
std::shared_ptr<boost::asio::io_context> p = libasio_getcontext(context_);
if (!p) {
return;
}
context_->stop();
_contexts_.erase(context_);
} __unlock__;
}
LIBASIO_API
bool libasio_postcontext(boost::asio::io_context* context_, uint64_t key_, libasio_post_callback callback_) {
if (!context_ || !callback_) {
return false;
}
__lock__(_syncobj_) {
std::shared_ptr<boost::asio::io_context> p = libasio_getcontext(context_);
if (!p) {
return false;
}
context_->post([context_, key_, callback_] {
callback_(context_, key_);
});
} __unlock__;
return true;
}
LIBASIO_API
boost::asio::ip::udp::socket* libasio_createsocket(boost::asio::io_context* context_, int sockfd_, bool v4_or_v6_) {
if (!context_ || sockfd_ == -1) {
return NULL;
}
__lock__(_syncobj_) {
libasio_context context = libasio_getcontext(context_);
if (!context) {
return NULL;
}
libasio_socket socket_ = std::make_shared<boost::asio::ip::udp::socket>(*context_);
if (v4_or_v6_) {
socket_->assign(boost::asio::ip::udp::v4(), sockfd_);
}
else {
socket_->assign(boost::asio::ip::udp::v6(), sockfd_);
}
boost::asio::ip::udp::socket* r_ = socket_.get();
_sockets_[r_] = std::move(socket_);
_so2ctxs_[r_] = std::move(context);
return r_;
} __unlock__;
}
LIBASIO_API
void libasio_closesocket(boost::asio::ip::udp::socket* socket_) {
if (!socket_) {
return;
}
__lock__(_syncobj_) {
libasio_context context = libasio_getcontext(socket_);
if (!context) {
return;
}
libasio_socket socket = libasio_getsocket(socket_);
boost::asio::post(*context, [context, socket] {
if (socket->is_open()) {
boost::system::error_code ec;
try {
socket->cancel(ec);
}
catch (std::exception&) {}
try {
socket->shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
}
catch (std::exception&) {}
try {
socket->close(ec);
}
catch (std::exception&) {}
}
});
_sockets_.erase(socket_);
_so2ctxs_.erase(socket_);
} __unlock__;
}
LIBASIO_API
bool libasio_recvfrom(boost::asio::ip::udp::socket* socket_, uint64_t key_, char* buf_, int size_, libasio_recvfrom_callback callback_) {
if (!socket_ || !buf_ || size_ <= 0 || !callback_) {
return false;
}
std::shared_ptr<boost::asio::ip::udp::endpoint> endpoint_ = std::make_shared<boost::asio::ip::udp::endpoint>();
__lock__(_syncobj_) {
libasio_context context_ = libasio_getcontext(socket_);
if (!context_) {
return false;
}
libasio_socket socket = libasio_getsocket(socket_);
socket->async_receive_from(boost::asio::buffer(buf_, size_), *endpoint_,
[context_, socket, key_, callback_, endpoint_](const boost::system::error_code& ec, uint32_t sz) {
int length_ = -1;
if (!ec) {
length_ = sz;
}
libasio_endpoint stack_;
if (endpoint_->protocol() == boost::asio::ip::udp::v4()) {
stack_.v4_or_v6_ = 1;
stack_.in4_.address_ = htonl(endpoint_->address().to_v4().to_uint());
stack_.in4_.port_ = endpoint_->port();
callback_(socket.get(), key_, length_, &stack_);
}
else if (endpoint_->protocol() == boost::asio::ip::udp::v6()) {
stack_.v4_or_v6_ = 0;
stack_.in6_.port_ = endpoint_->port();
boost::asio::ip::address_v6::bytes_type addr_bytes_ = endpoint_->address().to_v6().to_bytes();
memcpy(stack_.in6_.address_, addr_bytes_.data(), addr_bytes_.size());
callback_(socket.get(), key_, length_, &stack_);
}
else {
callback_(socket.get(), key_, length_, NULL);
}
});
} __unlock__;
return true;
}
LIBASIO_API
bool libasio_sendto(boost::asio::ip::udp::socket* socket_, uint64_t key_, char* buf_, int size_, libasio_endpoint* endpoint_, libasio_sendto_callback callback_) {
if (!socket_ || !buf_ || size_ <= 0 || !endpoint_) {
return false;
}
boost::asio::ip::udp::endpoint sendtoEP_;
if (endpoint_->v4_or_v6_) {
sendtoEP_ = boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4(ntohl(endpoint_->in4_.address_)), endpoint_->in4_.port_);
}
else {
boost::asio::ip::address_v6::bytes_type addr_bytes_;
memcpy(addr_bytes_.data(), endpoint_->in6_.address_, addr_bytes_.size());
sendtoEP_ = boost::asio::ip::udp::endpoint(boost::asio::ip::address_v6(addr_bytes_), endpoint_->in6_.port_);
}
__lock__(_syncobj_) {
libasio_context context_ = libasio_getcontext(socket_);
if (!context_) {
return false;
}
libasio_socket socket = libasio_getsocket(socket_);
socket->async_send_to(boost::asio::buffer(buf_, size_), sendtoEP_,
[context_, socket, key_, callback_](const boost::system::error_code& ec, uint32_t sz) {
if (callback_) {
int length_ = -1;
if (!ec) {
length_ = sz;
}
callback_(socket.get(), key_, length_);
}
});
} __unlock__;
return true;
}
C# AsyncContext.cs
namespace Ppp.Net.Auxiliary
{
using System;
using System.Security;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.Sockets;
#if NETCOREAPP
using System.Runtime.CompilerServices;
#endif
using System.Runtime.InteropServices;
using System.Threading;
public unsafe sealed class AsyncContext : IDisposable
{
[DllImport("libasio", EntryPoint = "libasio_newcontext", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.SysInt)]
private static extern IntPtr libasio_new_context();
[DllImport("libasio", EntryPoint = "libasio_closecontext", CallingConvention = CallingConvention.Cdecl)]
private static extern void libasio_closecontext([MarshalAs(UnmanagedType.SysInt)] IntPtr context_);
[DllImport("libasio", EntryPoint = "libasio_postcontext", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool libasio_postcontext([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, long key_, [MarshalAs(UnmanagedType.FunctionPtr)] libasio_post_callback callback_);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
private delegate void libasio_post_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, long key_);
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private IntPtr _handle = IntPtr.Zero;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private bool _disposed = false;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private long _mapkey = 0;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private readonly object _synobj = new object();
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private readonly byte[] _buffer = new byte[ushort.MaxValue];
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private GCHandle _buffer_gc = default(GCHandle);
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private static readonly AsyncScheduler _scheduler = new AsyncScheduler(Environment.ProcessorCount);
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private static readonly libasio_post_callback _callback = (context_, key_) =>
{
_callbacks.TryGetValue(context_, out ConcurrentDictionary<long, IOCompletionCallback> callbacks);
if (callbacks == null)
{
return;
}
callbacks.TryRemove(key_, out IOCompletionCallback callback_);
if (callback_ == null)
{
return;
}
callback_.callback_(callback_.state_);
};
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private static readonly ConcurrentDictionary<IntPtr, ConcurrentDictionary<long, IOCompletionCallback>> _callbacks =
new ConcurrentDictionary<IntPtr, ConcurrentDictionary<long, IOCompletionCallback>>();
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
[SecurityCritical]
[SecuritySafeCritical]
static AsyncContext() => GCHandle.Alloc(_callback);
private sealed class IOCompletionCallback
{
public object state_;
public Action<object> callback_;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
private ConcurrentDictionary<long, IOCompletionCallback> GetAllCallback()
{
lock (this._synobj)
{
if (this._disposed)
{
return null;
}
lock (_callbacks)
{
_callbacks.TryGetValue(this._handle, out ConcurrentDictionary<long, IOCompletionCallback> d);
if (d == null)
{
d = new ConcurrentDictionary<long, IOCompletionCallback>();
if (!_callbacks.TryAdd(this._handle, d))
{
d = null;
}
}
return d;
}
}
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
private long BindCallback(Action<object> callback, object state)
{
ConcurrentDictionary<long, IOCompletionCallback> callbacks = this.GetAllCallback();
if (callbacks == null)
{
return 0;
}
IOCompletionCallback cb = new IOCompletionCallback()
{
callback_ = callback,
state_ = state,
};
long key_ = 0;
do
{
while (key_ == 0)
{
key_ = Interlocked.Increment(ref this._mapkey);
}
} while (!callbacks.TryAdd(key_, cb));
return key_;
}
public static AsyncScheduler Scheduler
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get => AsyncContext._scheduler;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public AsyncContext()
{
this._handle = libasio_new_context();
this._buffer_gc = GCHandle.Alloc(this._buffer, GCHandleType.Pinned);
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
~AsyncContext() => this.Dispose();
public IntPtr Handle
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get => Interlocked.CompareExchange(ref this._handle, IntPtr.Zero, IntPtr.Zero);
}
public byte[] Buffer
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get => this._buffer;
}
public object Tag
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get;
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
set;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public AsyncSocket CreateSocket(Socket socket)
{
if (socket == null)
{
throw new ArgumentNullException(nameof(socket));
}
return new AsyncSocket(this, socket);
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public bool Post(Action<object> callback, object state)
{
if (callback == null)
{
return false;
}
lock (this._synobj)
{
if (this._disposed)
{
return false;
}
long key_ = this.BindCallback(callback, state);
if (key_ == 0)
{
return false;
}
return libasio_postcontext(this._handle, key_, _callback);
}
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public void Dispose()
{
lock (this._synobj)
{
if (!this._disposed)
{
this.Post((state) =>
{
var gc = __makeref(this._buffer_gc);
if (__refvalue(gc, GCHandle).IsAllocated)
{
__refvalue(gc, GCHandle).Free();
}
libasio_closecontext(this._handle);
_callbacks.TryRemove(this._handle, out ConcurrentDictionary<long, IOCompletionCallback> _);
}, default(object));
this._disposed = true;
}
}
GC.SuppressFinalize(this);
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public static AsyncContext GetContext() => AsyncContext._scheduler.GetContext();
}
}
C#?AsyncScheduler.cs
namespace Ppp.Net.Auxiliary
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
#if NETCOREAPP
using System.Runtime.CompilerServices;
#endif
public sealed class AsyncScheduler : IDisposable
{
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private readonly LinkedList<AsyncContext> _contexts = new LinkedList<AsyncContext>();
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private readonly object _syncobj = new object();
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public AsyncScheduler(int concurrent)
{
if (concurrent < 1)
{
concurrent = 1;
}
for (int i = 0; i < concurrent; i++)
{
AsyncContext context = new AsyncContext();
this._contexts.AddLast(context);
}
}
public int Concurrent
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get => _contexts.Count;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public void Dispose()
{
lock (this._syncobj)
{
LinkedListNode<AsyncContext> node = this._contexts.First;
while (node != null)
{
node.Value.Dispose();
node = node.Next;
}
this._contexts.Clear();
}
GC.SuppressFinalize(this);
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public AsyncContext GetContext()
{
lock (this._syncobj)
{
LinkedListNode<AsyncContext> node = this._contexts.First;
if (node == null)
{
return null;
}
this._contexts.RemoveFirst();
this._contexts.AddLast(node);
return node.Value;
}
}
}
}
C# AsyncSocket.cs?
namespace Ppp.Net.Auxiliary
{
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
#if NETCOREAPP
using System.Runtime.CompilerServices;
#endif
using System.Runtime.InteropServices;
using System.Security;
using System.Threading;
public unsafe sealed class AsyncSocket : IDisposable
{
[DllImport("libasio", EntryPoint = "libasio_createsocket", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.SysInt)]
private static extern IntPtr libasio_createsocket([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, int sockfd_, bool v4_or_v6_);
[DllImport("libasio", EntryPoint = "libasio_closesocket", CallingConvention = CallingConvention.Cdecl)]
private static extern void libasio_closesocket([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_);
[DllImport("libasio", EntryPoint = "libasio_sendto", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern bool libasio_sendto([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, void* buf_, int size_,
libasio_endpoint* endpoint_, [MarshalAs(UnmanagedType.FunctionPtr)] libasio_sendto_callback callback_);
[DllImport("libasio", EntryPoint = "libasio_recvfrom", CallingConvention = CallingConvention.Cdecl)]
private static extern bool libasio_recvfrom([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, void* buf_, int size_,
[MarshalAs(UnmanagedType.FunctionPtr)] libasio_recvfrom_callback callback_);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
private delegate void libasio_sendto_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, int length_);
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
private delegate void libasio_recvfrom_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, int length_, libasio_endpoint* remoteEP_);
[StructLayout(LayoutKind.Explicit)]
private struct libasio_endpoint
{
[StructLayout(LayoutKind.Sequential)]
public struct in4
{
public uint address_;
public int port_;
}
[StructLayout(LayoutKind.Sequential)]
public struct in6
{
public long address_1_;
public long address_2_;
public int port_;
}
[FieldOffset(0)]
public uint v4_or_v6_;
[FieldOffset(4)]
public in4 in4_;
[FieldOffset(4)]
public in6 in6_;
[FieldOffset(0)]
public long data_1_;
[FieldOffset(8)]
public long data_2_;
[FieldOffset(16)]
public long data_3_;
};
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private readonly object _synobj = new object();
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private readonly Socket _socket = null;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private readonly AsyncContext _context = null;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private bool _disposed = false;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private IntPtr _handle = IntPtr.Zero;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private long _mapkey = 0;
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private static readonly libasio_recvfrom_callback _recvfrom_callback = (socket_, key_, length_, remoteEP_) =>
{
_recvfrom_callbacks.TryGetValue(socket_, out ConcurrentDictionary<long, Action<int, EndPoint>> callbacks);
if (callbacks == null)
{
return;
}
callbacks.TryRemove(key_, out Action<int, EndPoint> callback_);
if (callback_ == null)
{
return;
}
IPEndPoint remoteEP = null;
if (remoteEP_ != null)
{
if (remoteEP_->v4_or_v6_ != 0)
{
remoteEP = new IPEndPoint(new IPAddress(remoteEP_->in4_.address_), remoteEP_->in4_.port_);
}
else
{
byte[] address_bytes = new byte[16];
fixed (byte* paddr_bytes = address_bytes)
{
long* paddr_i64 = (long*)paddr_bytes;
paddr_i64[0] = remoteEP_->in6_.address_1_;
paddr_i64[1] = remoteEP_->in6_.address_2_;
}
remoteEP = new IPEndPoint(new IPAddress(address_bytes), remoteEP_->in4_.port_);
}
}
callback_(length_, remoteEP);
};
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private static readonly ConcurrentDictionary<IntPtr, ConcurrentDictionary<long, Action<int, EndPoint>>> _recvfrom_callbacks =
new ConcurrentDictionary<IntPtr, ConcurrentDictionary<long, Action<int, EndPoint>>>();
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private static readonly libasio_sendto_callback _sendto_callback = (socket_, key_, length_) =>
{
_sendto_callbacks.TryGetValue(socket_, out ConcurrentDictionary<long, Action<int>> callbacks);
if (callbacks == null)
{
return;
}
callbacks.TryRemove(key_, out Action<int> callback_);
if (callback_ == null)
{
return;
}
callback_(length_);
};
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
private static readonly ConcurrentDictionary<IntPtr, ConcurrentDictionary<long, Action<int>>> _sendto_callbacks =
new ConcurrentDictionary<IntPtr, ConcurrentDictionary<long, Action<int>>>();
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
[SecurityCritical]
[SecuritySafeCritical]
static AsyncSocket()
{
GCHandle.Alloc(_sendto_callback);
GCHandle.Alloc(_recvfrom_callback);
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
private ConcurrentDictionary<long, Action<int, EndPoint>> GetAllReceiveFromCallback()
{
lock (this._synobj)
{
if (this._disposed)
{
return null;
}
lock (_recvfrom_callbacks)
{
_recvfrom_callbacks.TryGetValue(this._handle, out ConcurrentDictionary<long, Action<int, EndPoint>> d);
if (d == null)
{
d = new ConcurrentDictionary<long, Action<int, EndPoint>>();
if (!_recvfrom_callbacks.TryAdd(this._handle, d))
{
d = null;
}
}
return d;
}
}
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
private long BindReceiveFromCallback(Action<int, EndPoint> callback)
{
ConcurrentDictionary<long, Action<int, EndPoint>> callbacks = this.GetAllReceiveFromCallback();
if (callbacks == null)
{
return 0;
}
long key_ = 0;
do
{
while (key_ == 0)
{
key_ = Interlocked.Increment(ref this._mapkey);
}
} while (!callbacks.TryAdd(key_, callback));
return key_;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
private ConcurrentDictionary<long, Action<int>> GetAllSendToCallback()
{
lock (this._synobj)
{
if (this._disposed)
{
return null;
}
lock (_sendto_callbacks)
{
_sendto_callbacks.TryGetValue(this._handle, out ConcurrentDictionary<long, Action<int>> d);
if (d == null)
{
d = new ConcurrentDictionary<long, Action<int>>();
if (!_sendto_callbacks.TryAdd(this._handle, d))
{
d = null;
}
}
return d;
}
}
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
private long BindSendToCallback(Action<int> callback)
{
ConcurrentDictionary<long, Action<int>> callbacks = this.GetAllSendToCallback();
if (callbacks == null)
{
return 0;
}
long key_ = 0;
do
{
while (key_ == 0)
{
key_ = Interlocked.Increment(ref this._mapkey);
}
} while (!callbacks.TryAdd(key_, callback));
return key_;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
internal AsyncSocket(AsyncContext context, Socket socket)
{
this._handle = libasio_createsocket(context.Handle, socket.Handle.ToInt32(), socket.AddressFamily == AddressFamily.InterNetwork);
this._socket = socket;
this._context = context;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
~AsyncSocket() => this.Dispose();
public IntPtr Handle
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get => Interlocked.CompareExchange(ref this._handle, IntPtr.Zero, IntPtr.Zero);
}
public Socket Socket
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get => this._socket;
}
public AsyncContext Context
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get => this._context;
}
public object Tag
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get;
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
set;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public bool SendTo(byte[] buffer, int offset, int length, EndPoint destinationEP, Action<int> callback)
{
if (buffer == null ||
offset < 0 ||
length <= 0 ||
(offset + length) > buffer.Length ||
SocketExtension.CleanedUp(this._socket))
{
return false;
}
libasio_endpoint* localEP = stackalloc libasio_endpoint[1];
if (destinationEP.AddressFamily == AddressFamily.InterNetwork)
{
IPEndPoint ipep = (IPEndPoint)destinationEP;
localEP->v4_or_v6_ = 1;
localEP->in4_.port_ = ipep.Port;
fixed (byte* pb = ipep.Address.GetAddressBytes())
{
localEP->in4_.address_ = *(uint*)pb;
}
}
else
{
IPEndPoint ipep = (IPEndPoint)destinationEP;
localEP->v4_or_v6_ = 0;
localEP->in6_.port_ = ipep.Port;
fixed (byte* pb = ipep.Address.GetAddressBytes())
{
long* pl = (long*)pb;
localEP->in6_.address_1_ = pl[0];
localEP->in6_.address_2_ = pl[1];
}
}
fixed (byte* p = buffer)
{
lock (this._synobj)
{
if (this._disposed)
{
return false;
}
long key_ = this.BindSendToCallback(callback);
if (key_ == 0)
{
return false;
}
return libasio_sendto(this.Handle, key_, p + offset, length, localEP, _sendto_callback);
}
}
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public bool ReceiveFrom(byte[] buffer, int offset, int length, Action<int, EndPoint> callback)
{
if (buffer == null ||
callback == null ||
offset < 0 ||
length <= 0 ||
(offset + length) > buffer.Length ||
SocketExtension.CleanedUp(this._socket))
{
return false;
}
fixed (byte* p = buffer)
{
lock (this._synobj)
{
if (this._disposed)
{
return false;
}
long key_ = this.BindReceiveFromCallback(callback);
if (key_ == 0)
{
return false;
}
return libasio_recvfrom(this.Handle, key_, p + offset, length, _recvfrom_callback);
}
}
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public void Close() => this.Dispose();
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public void Dispose()
{
lock (this._synobj)
{
if (!this._disposed)
{
this._disposed = true;
lock (this._synobj)
{
SocketExtension.Closesocket(this._socket);
libasio_closesocket(this._handle);
}
}
}
_sendto_callbacks.TryRemove(this._handle, out ConcurrentDictionary<long, Action<int>> _);
_recvfrom_callbacks.TryRemove(this._handle, out ConcurrentDictionary<long, Action<int, EndPoint>> __);
GC.SuppressFinalize(this);
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public void Post(Action<object> callback, object state) => this._context.Post(callback, state);
}
}
C# SocketExtension.cs
public static Func<Socket, bool> CleanedUp
{
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
get;
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
private set;
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
[SecurityCritical]
[SecuritySafeCritical]
static SocketExtension()
{
SocketExtension.CleanedUp = SocketExtension.CompileCleanedUp();
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
[SecurityCritical]
[SecuritySafeCritical]
private static Func<Socket, bool> CompileCleanedUp()
{
try
{
PropertyInfo piCleanedUp = typeof(Socket).GetProperty("CleanedUp", BindingFlags.NonPublic | BindingFlags.Instance);
ParameterExpression s = Expression.Parameter(typeof(Socket), "s");
Expression<Func<Socket, bool>> e = Expression.Lambda<Func<Socket, bool>>(Expression.Property(s, piCleanedUp), s);
Func<Socket, bool> fCleanedUp = e.Compile();
return (socket) =>
{
if (socket == null)
{
return true;
}
if (socket is NetworkSocket NS)
{
return NS.CleanedUp;
}
return fCleanedUp(socket);
};
}
catch (Exception)
{
return (socket) =>
{
if (socket == null)
{
return true;
}
if (socket is NetworkSocket NS)
{
return NS.CleanedUp;
}
return false;
};
}
}
#if NETCOREAPP
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
public static void Closesocket(Socket socket)
{
bool cleanup = SocketExtension.CleanedUp(socket);
if (cleanup)
{
return;
}
lock (socket) // SocketExtension.Shutdown(socket);
{
try
{
socket.Shutdown(SocketShutdown.Send);
}
catch (Exception) { }
try
{
socket.Dispose();
}
catch (Exception) { }
}
}
?
|