IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> open yurt之yurt-tunel-server iptables规则源码分析 -> 正文阅读

[网络协议]open yurt之yurt-tunel-server iptables规则源码分析

简介

yurt-tunel-server是open yurt开源的用于转发来自K8s API server的包含logs、exec和metrics等运维指令请求到边缘节点yurt-tunel-agent的反向代理。
image.png

源码分析

版本v0.4.1,入口函数中:

	if cfg.EnableIptables {
//新建IptablesManager对象
		iptablesMgr := iptables.NewIptablesManager(
			cfg.Client,  //KAS client
			cfg.SharedInformerFactory.Core().V1().Nodes(),  //node informer
			cfg.ListenAddrForMaster,  //https监听地址端口,默认端口是10250
			cfg.ListenInsecureAddrForMaster,   //http监听地址端口,默认“127.0.0.1:10255”
			cfg.IptablesSyncPeriod,   //同步更新iptalbes规则的时间间隔,默认60秒,最小15秒。
                )
		if iptablesMgr == nil {
			return fmt.Errorf("fail to create a new IptableManager")
		}
		wg.Add(1)
		go iptablesMgr.Run(stopCh, &wg)   //启动规则同步任务
	}
// 定时更新规则
func (im *iptablesManager) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()
	// 等待节点缓存完成同步,也就是从k8s里获取到所有节点信息并缓存起来
	if !cache.WaitForCacheSync(stopCh,
		im.nodeInformer.Informer().HasSynced) {
		klog.Error("sync node cache timeout")
		return
	}
	// 服务启动后首次更新iptables规则
	im.syncIptableSetting()

	ticker := time.NewTicker(time.Duration(im.syncPeriod) * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-stopCh:
			klog.Info("stop the iptablesManager")
			im.cleanupIptableSetting()
			return
		case <-ticker.C:
			im.syncIptableSetting() //定时更新iptables规则
		}
	}
}
//更新iptables规则
func (im *iptablesManager) syncIptableSetting() {
	//从k8s里指定configMap对象存里提取出除10250和10255之外还需要进行DNAT到insecurePort的边缘侧端口数组
	dnatPorts, err := util.GetConfiguredDnatPorts(im.kubeClient, im.insecurePort)
	if err != nil {
		klog.Errorf("failed to sync iptables rules, %v", err)
		return
	}
	//和上一次缓存的端口数组进行比对,分出哪些是被删除了的,内部源码略去
	portsChanged, deletedDnatPorts := im.getDeletedPorts(dnatPorts)
	//将端口20150和10255加入到端口数组中
	currentDnatPorts := append(dnatPorts, kubeletSecurePort, kubeletInsecurePort)

	// 从k8s查询出所有不带有agent标记的节点,也就是所有云端节点的internal IP
	nodesIP := im.getIPOfNodesWithoutAgent()
	//和上一次缓存的节点internal IP数组进行比对,分出哪些节点新增的,哪些节点是已删除了的,内部源码略去
	nodesChanged, addedNodesIP, deletedNodesIP := im.getAddedAndDeletedNodes(nodesIP)
	//将回环境地址127.0.0.1加入到端口数组中
	currentNodesIP := append(nodesIP, loopbackAddr)

	// 更新iptables规则
	err = im.ensurePortsIptables(currentDnatPorts, deletedDnatPorts, currentNodesIP, deletedNodesIP)
	if err != nil {
		klog.Errorf("failed to ensurePortsIptables: %v", err)
		return
	}

	// 如果端口有变化,更新端口数组缓存lastDnatPorts 
	if portsChanged {
		im.lastDnatPorts = dnatPorts
		if len(deletedDnatPorts) != 0 {
			// 针对删除的DNAT端口,需要清除conntrack相关的规则
			im.clearConnTrackEntries(currentNodesIP, deletedDnatPorts)
		}
		klog.Infof("dnat ports changed, %v", dnatPorts)
	}

	// 如果云端节点internal IP有变化,更新云端节点internal IP数组lastNodesIP
	if nodesChanged {
		im.lastNodesIP = nodesIP
		im.clearConnTrackEntries(append(addedNodesIP, deletedNodesIP...), currentDnatPorts)
		klog.Infof("directly access nodes changed, %v for ports %v", nodesIP, currentDnatPorts)
	}
}
// 从k8s里指定命名空间里查询出指定名称的configMap对象,存有DNAT端口映射对,返回需要进行DANT的端口数组。也就是说,除10250和10255之外的边缘侧端口,如果还需要进行DNAT转发,就在这里配置
func GetConfiguredDnatPorts(client clientset.Interface, insecurePort string) ([]string, error) {
	ports := make([]string, 0)
	c, err := client.CoreV1().
		ConfigMaps(YurttunnelServerDnatConfigMapNs).
		Get(context.Background(), YurttunnelServerDnatConfigMapName, metav1.GetOptions{})
	if err != nil {
		if apierrors.IsNotFound(err) {
			return nil, fmt.Errorf("configmap %s/%s is not found",
				YurttunnelServerDnatConfigMapNs,
				YurttunnelServerDnatConfigMapName)
		} else {
			return nil, fmt.Errorf("fail to get configmap %s/%s: %v",
				YurttunnelServerDnatConfigMapNs,
				YurttunnelServerDnatConfigMapName, err)
		}
	}

	pairStr, ok := c.Data[yurttunnelServerDnatDataKey]
	if !ok || len(pairStr) == 0 {
		return ports, nil
	}
       

	portsPair := strings.Split(pairStr, ",")
	for _, pair := range portsPair {
		portPair := strings.Split(pair, "=")
		if len(portPair) == 2 && portPair[1] == insecurePort && len(portPair[0]) != 0 {
			if portPair[0] != "10250" && portPair[0] != "10255" {
				ports = append(ports, portPair[0])
			}
		}
	}

	return ports, nil
}
//从k8s查询出所有不带有agent标记的节点,也就是所有云端节点的internal IP
func (im *iptablesManager) getIPOfNodesWithoutAgent() []string {
	var nodesIP []string
	nodes, err := im.nodeInformer.Lister().List(labels.Everything())
	if err != nil {
		klog.Errorf("failed to list nodes for iptables: %v", err)
		return nodesIP
	}

	for i := range nodes {
		if withoutAgent(nodes[i]) && isNodeReady(nodes[i]) {
			nodeIPs := getNodeInternalIPs(nodes[i])
			nodesIP = append(nodesIP, nodeIPs...)
		}
	}

	klog.V(4).Infof("nodes without %s: %s", projectinfo.GetAgentName(), strings.Join(nodesIP, ","))
	metrics.Metrics.ObserveCloudNodes(len(nodesIP))
	return nodesIP
}
var (
	tunnelCommentStr   = strings.ReplaceAll(projectinfo.GetTunnelName(), "-", " ")
	//全局变量:iptables跳转规则链数组,初始化一条跳转规则:在DNAT表的OUTPUT链,tcp协议数据包跳到yurttunnelServerPortChain子链处理
	iptablesJumpChains = []iptablesJumpChain{
		{
			table:     iptables.TableNAT,
			dstChain:  yurttunnelServerPortChain,
			srcChain:  iptables.ChainOutput,
			comment:   fmt.Sprintf("%s server port", tunnelCommentStr),
			extraArgs: []string{"-p", "tcp"},
		},
	}
)

func (im *iptablesManager) ensurePortsIptables(currentPorts, deletedPorts, currentIPs, deletedIPs []string) error {
	//针对每个端口,确保有一条从yurttunnelServerPortChain子链跳到该端口号作为后缀命名的二级子链
	jumpChains := iptablesJumpChains
	for _, port := range currentPorts {
		jumpChains = append(jumpChains, iptablesJumpChain{
			table:     iptables.TableNAT,
			dstChain:  iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port)),
			srcChain:  yurttunnelServerPortChain,
			comment:   fmt.Sprintf("jump to port %s", port),
			extraArgs: []string{"-p", "tcp", "--dport", port},
		})
	}
	if err := im.ensureJumpChains(jumpChains); err != nil {
		klog.Errorf("Failed to ensure jump chain, %v", err)
		return err
	}

	// 针对每个端口,确保二级子链中的规则,于是重点就在ensurePortIptables函数中
	for _, port := range currentPorts {
		err := im.ensurePortIptables(port, currentIPs, deletedIPs)
		if err != nil {
			return err
		}
	}

	if len(deletedPorts) == 0 {
		return nil
	}

	// 如果某个端口被删除,则清除相应的二级子链
	var deletedJumpChains []iptablesJumpChain
	for _, port := range deletedPorts {
		deletedJumpChains = append(deletedJumpChains, iptablesJumpChain{
			table:     iptables.TableNAT,
			dstChain:  iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port)),
			srcChain:  yurttunnelServerPortChain,
			comment:   fmt.Sprintf("jump to port %s", port),
			extraArgs: []string{"-p", "tcp", "--dport", port},
		})
	}
	if err := im.deleteJumpChains(deletedJumpChains); err != nil {
		klog.Errorf("Failed to delete jump chain, %v", err)
		return err
	}

	return nil
}
func (im *iptablesManager) ensurePortIptables(port string, currentIPs, deletedIPs []string) error {
	//port 对应的二级子链名
	portChain := iptables.Chain(fmt.Sprintf("%s%s", yurttunnelPortChainPrefix, port))
	//如果没有云端节点,则清除该二级链下的所有规则,实际上这个条件应该不会出现true的情况
	if len(currentIPs) == 0 {
		_ = im.iptables.FlushChain(iptables.TableNAT, portChain)
		return nil
	}

	// 确保port对应的二级子链存在,没有则添加
	if _, err := im.iptables.EnsureChain(iptables.TableNAT, portChain); err != nil {
		klog.Errorf("could not ensure chain for tunnel server port(%s), %v", port, err)
		return err
	}

	//proxyDest用来放本地DNAT重定向目的地址。 如果API请求端口是10250,那么代理地址就是本地https监听地址,否则是本地http监听地址
	proxyDest := im.insecureDnatDest
	if port == kubeletSecurePort {
		proxyDest = im.secureDnatDest
	}

	// 针对所有现存云端节点的internal IP, tcp协议数据包全部直接RETURN操作,即不做DNAT处理。也就是说,如果API请求的目的IP是云端节点,那就不进行DNAT转发。
	for _, ip := range currentIPs {
		reqReturnPortIptablesArgs := reqReturnIptablesArgs(reqReturnComment, port, ip)
		_, err := im.iptables.EnsureRule(
			iptables.Prepend,
			iptables.TableNAT, portChain, reqReturnPortIptablesArgs...)
		if err != nil {
			klog.Errorf("could not ensure -j RETURN iptables rule for %s:%s: %v", ip, port, err)
			return err
		}
	}

	// 其它的tcp协议数据包,重定向到代理地址。也就是说,如果API请求的目的IP是边缘节点,那就进行DNAT转发到本地DNAT重定向目的地址,进而通过隧道转发到边缘
	dnatPortIptablesArgs := dnatIptablesArgs(dnatToTunnelComment, port, proxyDest)
	_, err := im.iptables.EnsureRule(
		iptables.Append,
		iptables.TableNAT, portChain, dnatPortIptablesArgs...)
	if err != nil {
		klog.Errorf("could not ensure dnat iptables rule for %s, %v", port, err)
		return err
	}

	//针对所有已删除云端节点的internal IP,删除对应的RETURN规则。
	for _, ip := range deletedIPs {
		deletedIPIptablesArgs := reqReturnIptablesArgs(reqReturnComment, port, ip)
		err = im.iptables.DeleteRule(iptables.TableNAT,
			portChain, deletedIPIptablesArgs...)
		if err != nil {
			klog.Errorf("could not delete old iptables rules for %s:%s: %v", ip, port, err)
			return err
		}
	}

	return nil
}
//公共规则参数,也就是匹配API请求的目的端口
func iptablesCommonArgs(msg, destPort string, destIP net.IP) []string {
	args := []string{
		"-p", "tcp",
		"-m", "comment",
	}
	if len(msg) != 0 {
		args = append(args, "--comment", msg)
	}
	if len(destPort) != 0 {
		args = append(args, "--dport", destPort)
	}
	if destIP != nil {
		ip := toCIDR(destIP)
		args = append(args, "-d", ip)
	}
	return args
}

//DNAT操作规则参数
func dnatIptablesArgs(msg, destPort, proxyDest string) []string {
	args := iptablesCommonArgs(msg, destPort, nil)
	args = append(args, "-j", "DNAT", "--to-destination", proxyDest)
	return args
}

//RETURN操作规则参数,非DNAT
func reqReturnIptablesArgs(msg, destPort, ip string) []string {
	destIP := net.ParseIP(ip)
	args := iptablesCommonArgs(msg, destPort, destIP)
	args = append(args, "-j", "RETURN")
	return args
}

总结

  • 定时从k8s获取云端节点信息(主要是节点的internal IP)列表和需要代理转发的目的端口(10250和10255不需要在configMap中配置,其它端口需要在在configMap中配置)列表,进行iptables规则更新。
  • 凡是目的地址是云端节点的tcp包都不做DNAT处理,非云端节点的tcp包就做DNAT处理进而交由tunel转发到边缘节点。
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-08-19 12:24:57  更:2021-08-19 12:25:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/10 3:17:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码