前言
Envoy 是一款面向 Service Mesh 的高性能网络代理服务。它与应用程序并行运行,通过以平台无关的方式提供通用功能来抽象网络。当基础架构中的所有服务流量都通过 Envoy 网格时,通过一致的可观测性,很容易地查看问题区域,调整整体性能。
Envoy也是istio的核心组件之一,以 sidecar 的方式与服务运行在一起,对服务的流量进行拦截转发,具有路由,流量控制等等强大特性。本系列文章,我们将不局限于istio,envoy的官方文档,从源码级别切入,分享Envoy启动、流量劫持、http 请求处理流程的进阶应用实例,深度分析Envoy架构。
本篇是Envoy请求流程源码解析的第二篇,主要分享Envoy的outbound方向上篇,包含启动监听和建立连接。注:本文中所讨论的issue和pr基于21年12月。
envoy当中基于libevent进行封装了各种文件,定时器事件等操作,以及dispatch对象的分发,和延迟析构,worker启动,worker listener绑定等部分不在这里作解读,后续有空可以单独再进行分析。跳过envoy当中的事件循环模型,这里以请求触发开始。
outbound方向
filter解析
启动监听
-
通过xDS或者静态配置,获得Envoy代理的监听器信息 -
如果监听器bind_to_port,则直接调用libevent的接口,绑定监听,回调函数设置为ListenerImpl::listenCallback
????void?ListenerManagerImpl::addListenerToWorker(Worker&?worker,
??????????????????????????????????????????????????absl::optional<uint64_t>?overridden_listener,
??????????????????????????????????????????????????ListenerImpl&?listener,
??????????????????????????????????????????????????ListenerCompletionCallback?completion_callback)?{
??????if?(overridden_listener.has_value())?{
????????ENVOY_LOG(debug,?"replacing?existing?listener?{}",?overridden_listener.value());
????????worker.addListener(overridden_listener,?listener,?[this,?completion_callback](bool)?->?void?{
??????????server_.dispatcher().post([this,?completion_callback]()?->?void?{
????????????stats_.listener_create_success_.inc();
????????????if?(completion_callback)?{
??????????????completion_callback();
????????????}
??????????});
????????});
????????return;
??????}
??????worker.addListener(
??????????overridden_listener,?listener,?[this,?&listener,?completion_callback](bool?success)?->?void?{
????????????//?The?add?listener?completion?runs?on?the?worker?thread.?Post?back?to?the?main?thread?to
????????????//?avoid?locking.
????????????server_.dispatcher().post([this,?success,?&listener,?completion_callback]()?->?void?{
?????
?????
?????
?????
????void?ListenSocketImpl::setupSocket(const?Network::Socket::OptionsSharedPtr&?options,
???????????????????????????????????????bool?bind_to_port)?{
??????setListenSocketOptions(options);
?????
??????if?(bind_to_port)?{
????????bind(address_provider_->localAddress());
??????}
????}
?????
?????
????ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler&?parent,
?????????????????????????????????????????Network::ListenerConfig&?config)
????????:?ActiveTcpListener(
??????????????parent,
??????????????parent.dispatcher().createListener(config.listenSocketFactory().getListenSocket(),?*this,
?????????????????????????????????????????????????config.bindToPort(),?config.tcpBacklogSize()),
??????????????config)?{}
?????
?????
????class?ActiveTcpListener?:?public?Network::TcpListenerCallbacks,
??????????????????????????????public?ActiveListenerImplBase,
??????????????????????????????public?Network::BalancedConnectionHandler,
??????????????????????????????Logger::Loggable<Logger::Id::conn_handler>?{
????public:
??????ActiveTcpListener(Network::TcpConnectionHandler&?parent,?Network::ListenerConfig&?config);
??????ActiveTcpListener(Network::TcpConnectionHandler&?parent,?Network::ListenerPtr&&?listener,
????????????????????????Network::ListenerConfig&?config);
??????~ActiveTcpListener()?override;
??????bool?listenerConnectionLimitReached()?const?{
????????//?TODO(tonya11en):?Delegate?enforcement?of?per-listener?connection?limits?to?overload
????????//?manager.
????????return?!config_->openConnections().canCreate();
??????}
?????
??????void?decNumConnections()?{
????????ASSERT(num_listener_connections_?>?0);
????????--num_listener_connections_;
????????config_->openConnections().dec();
??????}
?????
??????//?Network::TcpListenerCallbacks
??????void?onAccept(Network::ConnectionSocketPtr&&?socket)?override;
??????void?onReject(RejectCause)?override;
?????
????listener_.reset(
??????????//?libevent的base??????????????????????当前对象方法???????????????????套接字的文件描述符
??????????evconnlistener_new(&dispatcher.base(),?listenCallback,?this,?0,?-1,?socket.ioHandle().fd()));
??????
??????if?(!listener_)?{
????????throw?CreateListenerException(
????????????fmt::format("cannot?listen?on?socket:?{}",?socket.localAddress()->asString()));
??????}
??????
??????if?(!Network::Socket::applyOptions(socket.options(),?socket,
?????????????????????????????????????????envoy::api::v3::core::SocketOption::STATE_LISTENING))?{
????????throw?CreateListenerException(fmt::format("cannot?set?post-listen?socket?option?on?socket:?{}",
??????????????????????????????????????????????????socket.localAddress()->asString()));
??????}
??????
??????evconnlistener_set_error_cb(listener_.get(),?errorCallback);
????
关于reuseport
-
https://github.com/envoyproxy/envoy/issues/4602#issuecomment-544704931 -
https://github.com/envoyproxy/envoy/issues/8794 -
https://lwn.net/Articles/542629/ -
https://tech.flipkart.com/linux-tcp-so-reuseport-usage-and-implementation-6bfbf642885a
多个 server socket 监听相同的端口。每个 server socket 对应一个监听线程。内核 TCP 栈接收到客户端建立连接请求(SYN)时,按 TCP 4 元组(srcIP,srcPort,destIP,destPort) hash 算法,选择一个监听线程,唤醒之。新连接绑定到被唤醒的线程。所以相对于非SO_REUSEPORT , 连接更为平均地分布到线程中(hash 算法不是绝对平均)
envoy当中是支持在listener去设置开启这个特性,但是热重启场景时,对内核版本有一定要求(4.19-rc1)
https://www.envoyproxy.io/docs/envoy/v1.18.3/api-v3/config/listener/v3/listener.proto
验证观察
默认未开启,通过envoyfilter进行开启后,可见15001的端口被开启
apiVersion:?networking.istio.io/v1alpha3
kind:?EnvoyFilter
metadata:
??name:?reuseport
??namespace:?testhl
spec:
??workloadSelector:
????labels:
??????app:?asm-0
??configPatches:
????-?applyTo:?LISTENER
??????match:
????????context:?SIDECAR_OUTBOUND
????????listener:
??????????portNumber:?15001
??????????name:?"virtualOutbound"
??????patch:
????????operation:?MERGE
????????value:
??????????reuse_port:?true
需要重启 POD
而对于没有应用reuseport
大致的平均
关于绝对的链接平衡, 可以试试 Listener 的配置connection_balance_config:exact_balance ,不过由于有锁,对高频新连接应该有一定的性能损耗。目前只适用于 TCP 监听器
Network::BalancedConnectionHandlerOptRef?new_listener;
?
??if?(hand_off_restored_destination_connections_?&&
??????socket_->addressProvider().localAddressRestored())?{
????//?Find?a?listener?associated?with?the?original?destination?address.
????new_listener?=
????????listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());
??}
?
?
if?(!rebalanced)?{
????Network::BalancedConnectionHandler&?target_handler?=
????????config_->connectionBalancer().pickTargetHandler(*this);
????if?(&target_handler?!=?this)?{
??????target_handler.post(std::move(socket));
??????return;
????}
??}
?
??auto?active_socket?=?std::make_unique<ActiveTcpSocket>(*this,?std::move(socket),
?????????????????????????????????????????????????????????hand_off_restored_destination_connections);
?
??//?Create?and?run?the?filters
??config_->filterChainFactory().createListenerFilterChain(*active_socket);
??active_socket->continueFilterChain(true);
?
?
Network::BalancedConnectionHandlerOptRef
ConnectionHandlerImpl::getBalancedHandlerByAddress(const?Network::Address::Instance&?address)?{
??//?This?is?a?linear?operation,?may?need?to?add?a?map<address,?listener>?to?improve?performance.
??//?However,?linear?performance?might?be?adequate?since?the?number?of?listeners?is?small.
??//?We?do?not?return?stopped?listeners.
??auto?listener_it?=
??????std::find_if(listeners_.begin(),?listeners_.end(),
???????????????????[&address](std::pair<Network::Address::InstanceConstSharedPtr,
????????????????????????????????????????ConnectionHandlerImpl::ActiveListenerDetails>&?p)?{
?????????????????????return?p.second.tcpListener().has_value()?&&
????????????????????????????p.second.listener_->listener()?!=?nullptr?&&
????????????????????????????p.first->type()?==?Network::Address::Type::Ip?&&?*(p.first)?==?address;
???????????????????});
?
??//?If?there?is?exact?address?match,?return?the?corresponding?listener.
??if?(listener_it?!=?listeners_.end())?{
????return?Network::BalancedConnectionHandlerOptRef(
????????listener_it->second.tcpListener().value().get());
??}
?
??//?Otherwise,?we?need?to?look?for?the?wild?card?match,?i.e.,?0.0.0.0:[address_port].
??//?We?do?not?return?stopped?listeners.
??//?TODO(wattli):?consolidate?with?previous?search?for?more?efficiency.
??if?(Runtime::runtimeFeatureEnabled(
??????????"envoy.reloadable_features.listener_wildcard_match_ip_family"))?{
????listener_it?=
????????std::find_if(listeners_.begin(),?listeners_.end(),
?????????????????????[&address](const?std::pair<Network::Address::InstanceConstSharedPtr,
????????????????????????????????????????????????ConnectionHandlerImpl::ActiveListenerDetails>&?p)?{
建立连接
-
DispatcherImpl通过libevent,接收到请求,调用ListenerImpl::listenCallback -
client向envoy发起连接,envoy的worker接收eventloop的callback, 触发?Envoy::Network::ListenerImpl::listenCallback(port: 15001) -
15001的useOriginalDst": true ,accept_filters_ 中会带有OriginalDstFilter -
在OriginalDstFilter.OnAccept 中用os_syscalls.getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len) 获取在iptables修改之前dst ip??iptables与getsockopt
??Network::Address::InstanceConstSharedPtr?OriginalDstFilter::getOriginalDst(Network::Socket&?sock)?{
????return?Network::Utility::getOriginalDst(sock);
??}
???
?????
???
??sockaddr_storage?orig_addr;
????memset(&orig_addr,?0,?sizeof(orig_addr));
????socklen_t?addr_len?=?sizeof(sockaddr_storage);
????int?status;
???
????if?(*ipVersion?==?Address::IpVersion::v4)?{
??????status?=?sock.getSocketOption(SOL_IP,?SO_ORIGINAL_DST,?&orig_addr,?&addr_len).rc_;
????}?else?{
??????status?=?sock.getSocketOption(SOL_IPV6,?IP6T_SO_ORIGINAL_DST,?&orig_addr,?&addr_len).rc_;
????}
???
????if?(status?!=?0)?{
??????return?nullptr;
????}
???
????return?Address::addressFromSockAddr(orig_addr,?0,?true?/*?default?for?v6?constructor?*/);
-
在newconnection当中,还会通过?getBalancedHandlerByAddress寻找到实际的虚拟listener -
????void?ActiveTcpSocket::newConnection()?{
??????connected_?=?true;
?????
??????//?Check?if?the?socket?may?need?to?be?redirected?to?another?listener.
??????Network::BalancedConnectionHandlerOptRef?new_listener;
?????
??????if?(hand_off_restored_destination_connections_?&&
??????????socket_->addressProvider().localAddressRestored())?{
????????//?Find?a?listener?associated?with?the?original?destination?address.
????????new_listener?=
????????????listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());
??????}
????
-
通过ConnectionHandlerImpl::findActiveListenerByTag
??Network::BalancedConnectionHandlerOptRef
??ConnectionHandlerImpl::getBalancedHandlerByAddress(const?Network::Address::Instance&?address)?{
????//?This?is?a?linear?operation,?may?need?to?add?a?map<address,?listener>?to?improve?performance.
????//?However,?linear?performance?might?be?adequate?since?the?number?of?listeners?is?small.
????//?We?do?not?return?stopped?listeners.
????auto?listener_it?=
????????std::find_if(listeners_.begin(),?listeners_.end(),
?????????????????????[&address](std::pair<Network::Address::InstanceConstSharedPtr,
??????????????????????????????????????????ConnectionHandlerImpl::ActiveListenerDetails>&?p)?{
???????????????????????return?p.second.tcpListener().has_value()?&&
??????????????????????????????p.second.listener_->listener()?!=?nullptr?&&
??????????????????????????????p.first->type()?==?Network::Address::Type::Ip?&&?*(p.first)?==?address;
?????????????????????});
???
????//?If?there?is?exact?address?match,?return?the?corresponding?listener.
????if?(listener_it?!=?listeners_.end())?{
??????return?Network::BalancedConnectionHandlerOptRef(
??????????listener_it->second.tcpListener().value().get());
????}
????
查到addr对应的Listener
-
dispatcher.createServerConnection 传入accept到的fd 创建Server连接对象ConnectionImpl , 并把onFileEvent 注册到eventloop,等待读写事件的到来,因为socket是由一个non-blocking listening socket创建而来,所以也是non-blocking -
且注册的触发方式为epoll的边缘触发
????auto?server_conn_ptr?=?parent_.dispatcher().createServerConnection(
??????????std::move(socket),?std::move(transport_socket),?*stream_info);
?????
????Network::ServerConnectionPtr
????DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&&?socket,
???????????????????????????????????????????Network::TransportSocketPtr&&?transport_socket,
???????????????????????????????????????????StreamInfo::StreamInfo&?stream_info)?{
??????ASSERT(isThreadSafe());
??????return?std::make_unique<Network::ServerConnectionImpl>(
??????????*this,?std::move(socket),?std::move(transport_socket),?stream_info,?true);
????}
?????
????class?ServerConnectionImpl?:?public?ConnectionImpl,?virtual?public?ServerConnection?{
????public:
??????ServerConnectionImpl(Event::Dispatcher&?dispatcher,?ConnectionSocketPtr&&?socket,
???????????????????????????TransportSocketPtr&&?transport_socket,?StreamInfo::StreamInfo&?stream_info,
???????????????????????????bool?connected);
?????
??????//?ServerConnection?impl
??????void?setTransportSocketConnectTimeout(std::chrono::milliseconds?timeout)?override;
??????void?raiseEvent(ConnectionEvent?event)?override;
?????
????private:
??????void?onTransportSocketConnectTimeout();
?????
??????bool?transport_connect_pending_{true};
??????//?Implements?a?timeout?for?the?transport?socket?signaling?connection.?The?timer?is?enabled?by?a
??????//?call?to?setTransportSocketConnectTimeout?and?is?reset?when?the?connection?is?established.
??????Event::TimerPtr?transport_socket_connect_timer_;
????};
?????
?????
????Event::FileTriggerType?trigger?=?Event::PlatformDefaultTriggerType;
?????
??????//?We?never?ask?for?both?early?close?and?read?at?the?same?time.?If?we?are?reading,?we?want?to
??????//?consume?all?available?data.
??????socket_->ioHandle().initializeFileEvent(
??????????dispatcher_,?[this](uint32_t?events)?->?void?{?onFileEvent(events);?},?trigger,
??????????Event::FileReadyType::Read?|?Event::FileReadyType::Write);
?????
??????transport_socket_->setTransportSocketCallbacks(*this);
?????
?????
????constexpr?FileTriggerType?determinePlatformPreferredEventType()?{
????#if?defined(WIN32)?||?defined(FORCE_LEVEL_EVENTS)
??????return?FileTriggerType::EmulatedEdge;
????#else
??????return?FileTriggerType::Edge;
????#endif
????}
?????
????static?constexpr?FileTriggerType?PlatformDefaultTriggerType?=?determinePlatformPreferredEventType();
-
http的listener里filters为envoy.http_connection_manager ,buildFilterChain 里会把HTTP::ConnectionManagerImpl 加入到upstream_filters_ (list)中,这样在请求数据到达的时候,就可以使用http_connection_manager的on_read 方法 -
?????void?FilterManagerImpl::addReadFilter(ReadFilterSharedPtr?filter)?{
???????ASSERT(connection_.state()?==?Connection::State::Open);
???????ActiveReadFilterPtr?new_filter(new?ActiveReadFilter{*this,?filter});
???????filter->initializeReadFilterCallbacks(*new_filter);
???????LinkedList::moveIntoListBack(std::move(new_filter),?upstream_filters_);
?????}
??????
??????
?????CodecClient::CodecClient(Type?type,?Network::ClientConnectionPtr&&?connection,
??????????????????????????????Upstream::HostDescriptionConstSharedPtr?host,
??????????????????????????????Event::Dispatcher&?dispatcher)
?????????:?type_(type),?host_(host),?connection_(std::move(connection)),
???????????idle_timeout_(host_->cluster().idleTimeout())?{
???????if?(type_?!=?Type::HTTP3)?{
?????????//?Make?sure?upstream?connections?process?data?and?then?the?FIN,?rather?than?processing
?????????//?TCP?disconnects?immediately.?(see?https://github.com/envoyproxy/envoy/issues/1679?for
?????????//?details)
?????????connection_->detectEarlyCloseWhenReadDisabled(false);
???????}
???????connection_->addConnectionCallbacks(*this);
???????connection_->addReadFilter(Network::ReadFilterSharedPtr{new?CodecReadFilter(*this)});
??????
??????
??????connection_->noDelay(true);
?????
-
当连接刚刚加入eventloop的时候, Write Event会被立即触发,但因为write_buffer_ 没有数据,所以不会写入任何数据 -
???void?CodecClient::onEvent(Network::ConnectionEvent?event)?{
?????if?(event?==?Network::ConnectionEvent::Connected)?{
???????ENVOY_CONN_LOG(debug,?"connected",?*connection_);
???????connection_->streamInfo().setDownstreamSslConnection(connection_->ssl());
???????connected_?=?true;
?????}
????
?????if?(event?==?Network::ConnectionEvent::RemoteClose)?{
???????remote_closed_?=?true;
?????}
????
?????//?HTTP/1?can?signal?end?of?response?by?disconnecting.?We?need?to?handle?that?case.
?????if?(type_?==?Type::HTTP1?&&?event?==?Network::ConnectionEvent::RemoteClose?&&
?????????!active_requests_.empty())?{
???????Buffer::OwnedImpl?empty;
???????onData(empty);
?????}
????
?????if?(event?==?Network::ConnectionEvent::RemoteClose?||
?????????event?==?Network::ConnectionEvent::LocalClose)?{
???????ENVOY_CONN_LOG(debug,?"disconnect.?resetting?{}?pending?requests",?*connection_,
??????????????????????active_requests_.size());
???????disableIdleTimer();
???????idle_timer_.reset();
???????StreamResetReason?reason?=?StreamResetReason::ConnectionFailure;
???????if?(connected_)?{
?????????reason?=?StreamResetReason::ConnectionTermination;
?????????if?(protocol_error_)?{
???????????if?(Runtime::runtimeFeatureEnabled(
???????????????????"envoy.reloadable_features.return_502_for_upstream_protocol_errors"))?{
?????????????reason?=?StreamResetReason::ProtocolError;
?????????????connection_->streamInfo().setResponseFlag(
?????????????????StreamInfo::ResponseFlag::UpstreamProtocolError);
???????????}
?????????}
???????}
???????while?(!active_requests_.empty())?{
?????????//?Fake?resetting?all?active?streams?so?that?reset()?callbacks?get?invoked.
?????????active_requests_.front()->encoder_->getStream().resetStream(reason);
???????}
?????}
???}
ASM试用申请
Envoy是Istio中的Sidecar官方标配,是一个面向Service Mesh的高性能网络代理服务。
当前Service Mesh是Kubernetes上微服务治理的最佳实践,灵雀云微服务治理平台Alauda Service Mesh(简称:ASM)可完整覆盖微服务落地所需要的基础设施,让开发者真正聚焦业务。
点击此处,深入体验ASM!
关于【云原生小课堂】
【云原生小课堂】是由灵雀云、Kube-OVN社区、云原生技术社区联合开设的公益性技术分享类专题,将以丰富详实的精品内容和灵活多样的呈现形式,持续为您分享云原生前沿技术,带您了解更多云原生实践干货。
在数字化转型的背景下,云原生已经成为企业创新发展的核心驱动力。作为国内最早将 Kubernetes 产品化的厂商之一,灵雀云从出生便携带“云原生基因”,致力于通过革命性的技术帮助企业完成数字化转型,我们期待着云原生给这个世界带来更多改变。
关注我们,学习更多云原生知识,一起让改变发生。
相关阅读:
云原生小课堂 | Envoy请求流程源码解析(一):流量劫持
|