原文&思路参见(本例代码调整较多,也做了比较多的改进):基于Ocelot的gRpcHttp网关_dotNET跨平台的博客-CSDN博客
网关架设后,请求即为如下:
思路解析:
1、定时监控某个存放.proto的文件夹。(参见:DirectoryMonitorBackgroundService)
2、当文件有变动时 调用protoc工具生成C#代码。(本例增加一次性编译多个文件,但未解决引用其他proto的问题&相同类名的问题)
3、生成代码后调用CSharpCompilation实例来生成对应的DLL。
4、反编译DLL后取得MethodDescriptor对象,并缓存起来。
5、重新注入Ocelot中的IHttpRequester接口。该接口作用是? 根据获取得到的DownstreamRequest采用HTTP下发下游主机请求得到数据后 在转发给请求方。
6、IHttpRequester在处理请求时判断是否包含grpc的请求值,如果是则解析出服务名与方法名,并匹配MethodDescriptor对象缓存。
7、自行实现ClientBase<T>,并在创建Channel后创建Client实例。然后调用gRP服务
8、取得数据后,转发给请求方
以下列出部分具体的代码。
//作用:根据取得的变动文件,生成c#代码,而后生成DLL,然后反编译获取MethodDescriptor
[Serializable]
public class GrpcCodeGeneraterSubscriber : IEventSubscriber
{
private readonly ILogger<GrpcCodeGeneraterSubscriber> logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
private readonly IGrpcServiceDescriptor serviceDescriptor = null;
private readonly IProtoGenerateCode protoGenerateCode = null;
public GrpcCodeGeneraterSubscriber(ILogger<GrpcCodeGeneraterSubscriber> logger, IGrpcServiceDescriptor serviceDescriptor, IProtoGenerateCode protoGenerateCode)
{
this.logger = logger;
this.serviceDescriptor = serviceDescriptor;
this.protoGenerateCode = protoGenerateCode;
}
[EventSubscribe("GrpcCodeGenerater")]
public async Task GrpcCodeGenerater(EventHandlerExecutingContext context)
{
var protefileList = context.Source.Payload as string[];
if (protefileList == null || protefileList.Length <= 0)
return;
this.protoGenerateCode.GenerateCsharpFromProto(protefileList);
foreach (var protofilepath in protefileList)
{
var protofilenamewithoutExtension = Path.GetFileNameWithoutExtension(protofilepath);
if (GenerateDllAsync(protofilenamewithoutExtension) == false)
return;
var csharp_out = Path.Combine(BaseDirectory, $"plugins/.{protofilenamewithoutExtension}");
File.WriteAllText(Path.Combine(csharp_out, $"plugin.txt"), File.GetLastWriteTime(protofilepath).ToString("yyyy-MM-dd HH:mm:ss"));
await this.serviceDescriptor.CreateGrpcDescriptorAsync(Path.Combine(csharp_out, $"{protofilenamewithoutExtension}.dll"));
}
this.logger.LogInformation($" generater dll compeleted:{string.Join<string>(",", protefileList.Select(y => Path.GetFileName(y)))} ");
//删除文件
foreach (string var in Directory.GetFiles(Path.Combine(BaseDirectory, "plugins"), "*.cs"))
File.Delete(var);
}
private bool GenerateDllAsync(string assemblyName)
{
var dirpath = Path.Combine(BaseDirectory, "plugins");
var dllFiles = Directory.GetFiles(dirpath, "*.cs");
if (dllFiles.Length == 0)
return false;
List<SyntaxTree> trees = new List<SyntaxTree>();
foreach (var file in dllFiles)
{
var fileName = Path.GetFileNameWithoutExtension(file).ToLower();
if (fileName != assemblyName.ToLower() && fileName != string.Concat(assemblyName, "Grpc").ToLower())
continue;
var csStr = File.ReadAllText(file);
trees.Add(CSharpSyntaxTree.ParseText(csStr, encoding: Encoding.UTF8));
}
var references2 = new[]{
MetadataReference.CreateFromFile(Assembly.Load("netstandard, Version=2.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime, Version=0.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.IO, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Memory, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Threading.Tasks, Version=4.0.10.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Google.Protobuf.ProtoPreconditions).Assembly.Location),
MetadataReference.CreateFromFile(typeof(SerializationContext).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Channel).Assembly.Location)
};
var options = new CSharpCompilationOptions(outputKind: OutputKind.DynamicallyLinkedLibrary, optimizationLevel: OptimizationLevel.Debug, generalDiagnosticOption: ReportDiagnostic.Error);
var dlldir = Path.Combine(dirpath, $".{assemblyName}");
if (Directory.Exists(dlldir) == false)
Directory.CreateDirectory(dlldir);
var result2 = CSharpCompilation.Create(assemblyName, trees, references2, options).Emit(Path.Combine(dlldir, $"{assemblyName}.dll"));
this.logger.Log(result2.Success ? LogLevel.Debug : LogLevel.Error, string.Join(",", result2.Diagnostics.Select(d => string.Format("[{0}]:{1}({2})", d.Id, d.GetMessage(), d.Location.GetLineSpan().StartLinePosition))));
return result2.Success;
}
}
//作用:调用protoc工具生成C#代码,本例生成xxx.cs & xxxGrpc.cs
public class ProtoGenerateCode : IProtoGenerateCode
{
private readonly ILogger<ProtoGenerateCode> Logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
public ProtoGenerateCode(ILogger<ProtoGenerateCode> logger)
{
this.Logger = logger;
}
public void GenerateCsharpFromProto(params string[] protoPath)
{
var architecture = RuntimeInformation.OSArchitecture.ToString().ToLower();// 系统架构,x86 x64
var bin = string.Empty;
var os = string.Empty;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
os = "windows";
bin = ".exe";
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
os = "linux";
else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
os = "macosx";
else
{
Logger.LogError("该平台不支持grpctools.");
return;
}
var args = new Dictionary<string, List<string>>();
var protocPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/protoc{bin}");
var grpcPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/grpc_csharp_plugin{bin}");
string outdir = Path.Combine(BaseDirectory, "plugins");
if (Directory.Exists(outdir) == false)
Directory.CreateDirectory(outdir);
args.Add("proto_path", new List<string>(protoPath.Select(t => Path.GetDirectoryName(t)).Distinct(StringComparer.OrdinalIgnoreCase)));
args.Add("csharp_out", new List<string>(new string[] { outdir }));
if (!string.IsNullOrEmpty(grpcPath))
{
args.Add("plugin", new List<string>(new string[] { "protoc-gen-grpc=" + grpcPath }));
args.Add("grpc_out", new List<string>(new string[] { outdir }));
}
var argsValue = WriteArgs(string.Join<string>(" ", protoPath), args);//批量.\st\supplier.proto .\st\customer.proto
Logger.LogInformation("Running: " + protocPath + " " + argsValue);
var exitCode = RunProtoc(protocPath, argsValue, string.Empty, out string stdout, out string stderr);
if (!string.IsNullOrEmpty(stderr))
throw new InvalidOperationException(stderr);
}
private string WriteArgs(string protoFile, Dictionary<string, List<string>> args)
{
var sb = new StringBuilder();
foreach (var kvp in args)
{
foreach (var argInstance in kvp.Value)
sb.AppendFormat("--{0}={1} ", kvp.Key, argInstance);
}
sb.AppendFormat(" {0}", protoFile);
return sb.ToString();
}
static int RunProtoc(string path, string arguments, string workingDir, out string stdout, out string stderr)
{
using (var proc = new Process())
{
var psi = proc.StartInfo;
psi.FileName = path;
psi.Arguments = arguments;
if (!string.IsNullOrEmpty(workingDir))
psi.WorkingDirectory = workingDir;
psi.RedirectStandardError = psi.RedirectStandardOutput = true;
psi.UseShellExecute = false;
psi.CreateNoWindow = true;
proc.Start();
var stdoutTask = proc.StandardOutput.ReadToEndAsync();
var stderrTask = proc.StandardError.ReadToEndAsync();
if (!proc.WaitForExit(5000))
{
try { proc.Kill(); } catch { }
}
var exitCode = proc.ExitCode;
stderr = stdout = "";
if (stdoutTask.Wait(1000))
stdout = stdoutTask.Result;
if (stderrTask.Wait(1000))
stderr = stderrTask.Result;
return exitCode;
}
}
}
//重新该接口以实现RPC的转发,
//indexof('grpc')的实现,是因为有可能采用https的请求
//端口号加1000的作用是下游服务也支持http,所以grpc请求的端口就是http的端口在加上1000
[Serializable]
public class GrpcHttpRequester : IHttpRequester
{
private readonly IHttpClientCache _cacheHandlers;
private readonly IOcelotLogger _logger;
private readonly IDelegatingHandlerHandlerFactory _factory;
private readonly IExceptionToErrorMapper _mapper;
public GrpcHttpRequester(IOcelotLoggerFactory loggerFactory,
IHttpClientCache cacheHandlers,
IDelegatingHandlerHandlerFactory factory,
IExceptionToErrorMapper mapper)
{
this._logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
this._cacheHandlers = cacheHandlers;
this._factory = factory;
this._mapper = mapper;
}
public async Task<Response<HttpResponseMessage>> GetResponse(HttpContext httpContext)
{
var downstreamRequest = httpContext.Items.DownstreamRequest();
if (downstreamRequest.Scheme.IndexOf("grpc") < 0)
return await ProcessHttpResponse(httpContext);
return await ProcessGrpcResponse(httpContext);
}
private async Task<Response<HttpResponseMessage>> ProcessGrpcResponse(HttpContext httpContext)
{
try
{
GrpcRequestMessage grpcRequestMessage = await GrpcRequestMessage.FromReuqest(httpContext);
if (grpcRequestMessage == null || grpcRequestMessage.GrpcRequestMethod == null)
return await this.ProcessHttpResponse(httpContext);
//throw new NullReferenceException($"Request url:{httpContext.Request.Path.ToString()}Can't found Grpc.ServiceName & MethodName.");
var downStreamRqeust = httpContext.Items.DownstreamRequest();
var options = new List<ChannelOption> { new ChannelOption("keepalive_time_ms", 60000) };
if (string.IsNullOrEmpty(grpcRequestMessage.RequestVersion) == false)
options.Add(new ChannelOption("requestVersion", grpcRequestMessage.RequestVersion));
//这个是长链接的,不加连接池是否会有问题??(参照:https://github.com/leenux/GrpcPool)-->数据量比较大的情况下怎么处理
Channel channel = new Channel(downStreamRqeust.Host, Convert.ToInt32(downStreamRqeust.Port) + 1000, ChannelCredentials.Insecure, options);
var client = new MethodDescriptorClient(channel);
var httpResponseMessage = await client.InvokeAsync(grpcRequestMessage);
return new OkResponse<HttpResponseMessage>(httpResponseMessage);
}
catch (RpcException exception)
{
var error = _mapper.Map(exception);
return new OKButFailResponse<HttpResponseMessage>(error);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
}
private async Task<Response<HttpResponseMessage>> ProcessHttpResponse(HttpContext httpContext)
{
var builder = new HttpClientBuilder(_factory, _cacheHandlers, _logger);
var downstreamRoute = httpContext.Items.DownstreamRoute();
var downstreamRequest = httpContext.Items.DownstreamRequest();
var httpClient = builder.Create(downstreamRoute);
try
{
var response = await httpClient.SendAsync(downstreamRequest.ToHttpRequestMessage(), httpContext.RequestAborted);
return new OkResponse<HttpResponseMessage>(response);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
finally
{
builder.Save();
}
}
}
//作用:实现GRPC的请求,具体看InvokeAsync方法
public class MethodDescriptorClient : ClientBase<MethodDescriptorClient>
{
public MethodDescriptorClient(Channel channel)
: base(channel)
{
}
public MethodDescriptorClient(CallInvoker callInvoker)
: base(callInvoker)
{
}
public MethodDescriptorClient()
: base()
{
}
protected MethodDescriptorClient(ClientBaseConfiguration configuration)
: base(configuration)
{
}
protected override MethodDescriptorClient NewInstance(ClientBaseConfiguration configuration)
{
return new MethodDescriptorClient(configuration);
}
/// <summary>
/// InvokeAsync
/// </summary>
public Task<HttpResponseMessage> InvokeAsync(GrpcRequestMessage grpcRequestMessage)
{
var methodDescriptor = grpcRequestMessage.GrpcRequestMethod;
System.Reflection.MethodInfo m = typeof(MethodDescriptorClient).GetMethod("CallGrpcAsyncCore", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
return (Task<HttpResponseMessage>)m.MakeGenericMethod(new Type[] { methodDescriptor.InputType.ClrType, methodDescriptor.OutputType.ClrType }).Invoke(this, new object[] { grpcRequestMessage });
}
private async Task<HttpResponseMessage> CallGrpcAsyncCore<TRequest, TResponse>(GrpcRequestMessage grpcRequestMessage) where TRequest : class, IMessage<TRequest> where TResponse : class, IMessage<TResponse>
{
CallOptions option = CreateCallOptions(grpcRequestMessage.Headers);
var rpc = GrpcMethodBuilder<TRequest, TResponse>.GetMethod(grpcRequestMessage.GrpcRequestMethod);
var requestMessage = await grpcRequestMessage.RequestMessage.Content.ReadAsStringAsync();
TRequest request = JsonConvert.DeserializeObject<TRequest>(requestMessage);
List<TRequest> requests = new List<TRequest>() { request };
switch (rpc.Type)
{
case MethodType.Unary:
var taskUnary = await AsyncUnaryCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskUnary.Item2, taskUnary.Item1);
case MethodType.ClientStreaming:
var taskClientStreaming = await AsyncClientStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskClientStreaming.Item2, taskClientStreaming.Item1);
case MethodType.ServerStreaming:
var taskServerStreaming = await AsyncServerStreamingCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskServerStreaming.Item2, taskServerStreaming.Item1);
case MethodType.DuplexStreaming:
var taskDuplexStreaming = await AsyncDuplexStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskDuplexStreaming.Item2, taskDuplexStreaming.Item1);
default:
throw new NotSupportedException($"MethodType '{rpc.Type}' is not supported.");
}
}
private Task<HttpResponseMessage> ProcessHttpResponseMessage<TResponse>(Metadata headers, params TResponse[] responses)
{
HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK);
httpResponseMessage.Content = new StringContent(JsonConvert.SerializeObject(responses));
foreach (var entry in headers)
httpResponseMessage.Headers.Add(entry.Key, entry.Value);
return Task.FromResult(httpResponseMessage);
}
private CallOptions CreateCallOptions(HttpRequestHeaders headers)
{
Metadata meta = new Metadata();
foreach (var entry in headers)
meta.Add(entry.Key.Replace("grpc.", ""), entry.Value.FirstOrDefault());
CallOptions option = new CallOptions(meta);
return option;
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncUnaryCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncUnaryCall<TResponse> call = invoker.AsyncUnaryCall(method, null, option, request))
{
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncClientStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncClientStreamingCall<TRequest, TResponse> call = invoker.AsyncClientStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncServerStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncServerStreamingCall<TResponse> call = invoker.AsyncServerStreamingCall(method, null, option, request))
{
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncDuplexStreamingCall<TRequest, TResponse> call = invoker.AsyncDuplexStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
}
MethodDescriptorClient 类中获取Reuqest的请求参数中,目前只支持StringContent (httpClient POST的方式请求到后端,取得到的都是ByteArrayContent)
所以客户端的调用方式如下:
using (HttpClient httpClient = new HttpClient())
{
var queryurl = queryUrl();
if (queryurl.EndsWith("/") == false)
queryurl += "/";
//var content = new MultipartFormDataContent();//DateTime.Now.Ticks.ToString("X")
//content.Add(new StringContent(" no like %'1.402.03.01674'%"), "WhereString");
var postObj = new { WhereString = " no like %'1.402.03.01674'%" };
StringContent content = new StringContent(JsonConvert.SerializeObject(postObj), Encoding.UTF8, "application/json");
var postResultTask = httpClient.PostAsync(string.Concat(queryurl, usegrpc ? "grpc" : "api", "/", "Supplier", "/", "QuerySupplier"), content);
var responseMessage = postResultTask.GetAwaiter().GetResult();
this.textBox1.Text = JsonConvert.SerializeObject(responseMessage.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
网关注册如下
public void ConfigureServices(IServiceCollection services)
{
services.AddOcelot().AddOcelotGrpc();
}
网关服务需要放置protoc程序。第二个目录用来放置proto文件
?
?Git地址:Sam/Ocelot.Provider.RPC
|