IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> APISIX源码解析-ETCD-new方法 -> 正文阅读

[大数据]APISIX源码解析-ETCD-new方法

方法介绍

    local user_routes, err = core.config.new("/routes", {
            automatic = true,
            item_schema = core.schema.route,
            checker = check_route,
            filter = filter,
        })
    if not user_routes then
        error("failed to create etcd instance for fetching /routes : " .. err)
    end

从字面上很容易误导成是单纯的new个对象信息出来

源码解析

new(key,opts)

function _M.new(key, opts)
    local local_conf, err = config_local.local_conf()
    if not local_conf then
        return nil, err
    end

    local etcd_conf = local_conf.etcd
    local prefix = etcd_conf.prefix
    local resync_delay = etcd_conf.resync_delay
    if not resync_delay or resync_delay < 0 then
        resync_delay = 5
    end
    local health_check_timeout = etcd_conf.health_check_timeout
    if not health_check_timeout or health_check_timeout < 0 then
        health_check_timeout = 10
    end

    local automatic = opts and opts.automatic
    local item_schema = opts and opts.item_schema
    local filter_fun = opts and opts.filter
    local timeout = opts and opts.timeout
    local single_item = opts and opts.single_item
    local checker = opts and opts.checker

    -- 初始化数据
    local obj = setmetatable({
        etcd_cli = nil,
        key = key and prefix .. key,
        automatic = automatic,
        item_schema = item_schema,
        checker = checker,
        sync_times = 0,
        running = true,
        conf_version = 0,
        values = nil,
        need_reload = true,
        routes_hash = nil,
        prev_index = 0,
        last_err = nil,
        last_err_time = nil,
        resync_delay = resync_delay,
        health_check_timeout = health_check_timeout,
        timeout = timeout,
        single_item = single_item,
        filter = filter_fun,
    }, mt)

    -- 自动同步数据
    if automatic then
        if not key then
            return nil, "missing `key` argument"
        end

        -- loaded_configuration 该变量是在etcd init方法初始化的时候写入的 ,
        -- 会根据短key将对应ETCD中的数据按node结构封装到变量中 ,如下:
        -- HTTP_ETCD_DIRECTORY = {
        --     ["/upstreams"] = true,
        --     ["/plugins"] = true,
        --     ["/ssl"] = true,
        --     ["/stream_routes"] = true,
        --     ["/plugin_metadata"] = true,
        --     ["/routes"] = true,
        --     ["/services"] = true,
        --     ["/consumers"] = true,
        --     ["/global_rules"] = true,
        --     ["/proto"] = true,
        --     ["/plugin_configs"] = true,
        -- },
        -- STREAM_ETCD_DIRECTORY = {
        --     ["/upstreams"] = true,
        --     ["/plugins"] = true,
        --     ["/ssl"] = true,
        --     ["/stream_routes"] = true,
        --     ["/plugin_metadata"] = true,
        -- }
        if loaded_configuration[key] then
            local res = loaded_configuration[key]
            -- 若有值进行清空,后面重新sync load
            loaded_configuration[key] = nil -- tried to load

            log.notice("use loaded configuration ", key)

            local dir_res, headers = res.body, res.headers
            load_full_data(obj, dir_res, headers)
        end

        ngx_timer_at(0, _automatic_fetch, obj)

    else
        -- 如果不需要自动同步数据,设置etcd client
        local etcd_cli, err = get_etcd()
        if not etcd_cli then
            return nil, "failed to start a etcd instance: " .. err
        end
        obj.etcd_cli = etcd_cli
    end

    if key then
        created_obj[key] = obj
    end

    return obj
end

load_full_data(self, dir_res, headers)

        self.values = new_tab(#dir_res.nodes, 0)
        self.values_hash = new_tab(0, #dir_res.nodes)

        for _, item in ipairs(dir_res.nodes) do
            local key = short_key(self, item.key)
            local data_valid = true
            if type(item.value) ~= "table" then
                data_valid = false
                log.error("invalid item data of [", self.key .. "/" .. key,
                          "], val: ", item.value,
                          ", it should be an object")
            end

            -- 对数据进行各种验证
            if data_valid and self.item_schema then
                data_valid, err = check_schema(self.item_schema, item.value)
                if not data_valid then
                    log.error("failed to check item data of [", self.key,
                              "] err:", err, " ,val: ", json.encode(item.value))
                end
            end

            if data_valid and self.checker then
                data_valid, err = self.checker(item.value)
                if not data_valid then
                    log.error("failed to check item data of [", self.key,
                              "] err:", err, " ,val: ", json.delay_encode(item.value))
                end
            end

            if data_valid then
                changed = true
                -- 往对象obj中 添加对应的node数据
                insert_tab(self.values, item)
                -- hash值 为序号,因为#self.values随着node增加  值数量在递增
                self.values_hash[key] = #self.values

                item.value.id = key
                item.clean_handlers = {}

                if self.filter then
                    self.filter(item)
                end
            end

            self:upgrade_version(item.modifiedIndex)
        end
.........
-- 最后这个很重要
self.need_reload = false

ngx_timer_at

ngx_timer_at(0, _automatic_fetch, obj)

demo:

local delay = 5
local handler
handler = function (premature,param)
    if premature then
        return
    end

    ngx.log(ngx.ERR, "param is : ", param)
end

local ok, err = ngx.timer.at(delay, handler,"Hello world")

premature这个参数不是调用ngx.timer.at传入的参数,它是一个固定的参数,表示定时器是否过期;
param才是调用时的额外参数

_automatic_fetch(premature, self)

......
            if not self.etcd_cli then
                local etcd_cli, err = get_etcd()
                if not etcd_cli then
                    error("failed to create etcd instance for key ["
                          .. self.key .. "]: " .. (err or "unknown"))
                end
                -- 获取etcd实例
                self.etcd_cli = etcd_cli
            end

            local ok, err = sync_data(self)
.........

sync_data(self)

    -- 需要重新加载数据
    if self.need_reload then
        -- 全量加载数据
        local res, err = readdir(self.etcd_cli, self.key)
        if not res then
            return false, err
        end

        local dir_res, headers = res.body.node or {}, res.headers
        log.debug("readdir key: ", self.key, " res: ",
                  json.delay_encode(dir_res))
        if not dir_res then
            return false, err
        end
        --  清理历史数据
        if self.values then
            for i, val in ipairs(self.values) do
                if val and val.clean_handlers then
                    for _, clean_handler in ipairs(val.clean_handlers) do
                        -- 触发清理事件
                        clean_handler(val)
                    end
                    val.clean_handlers = nil
                end
            end

            self.values = nil
            self.values_hash = nil
        end
        --  获取到的key的全量数据,设置到self(obj)中
        --  need_reload = false;
        load_full_data(self, dir_res, headers)
        return true
    end
    -- 如果need_reload = false 采用watch事件通知的方式,动态获取增量数据
    local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout)
    log.info("waitdir key: ", self.key, " prev_index: ", self.prev_index + 1)
    log.info("res: ", json.delay_encode(dir_res, true))
   .........
   if pre_index then
            local pre_val = self.values[pre_index]
            if pre_val and pre_val.clean_handlers then
                for _, clean_handler in ipairs(pre_val.clean_handlers) do
                    clean_handler(pre_val)
                end
                pre_val.clean_handlers = nil
            end

            if res.value then
                if not self.single_item then
                    res.value.id = key
                end

                self.values[pre_index] = res
                res.clean_handlers = {}
                log.info("update data by key: ", key)

            else
                -- 这边很重要,当结果为空时,对应pre_index索引的node值设置成空
                -- 索引key一般为随机串
                -- 时间长了,就会有很多空值,占用空间,这个时候就需要清理了
                self.sync_times = self.sync_times + 1
                self.values[pre_index] = false
                self.values_hash[key] = nil
                log.info("delete data by key: ", key)
            end

        elseif res.value then
            res.clean_handlers = {}
            insert_tab(self.values, res)
            self.values_hash[key] = #self.values
            if not self.single_item then
                res.value.id = key
            end

            log.info("insert data by key: ", key)
        end
        -- avoid space waste
        -- 这边设置成 同步过100次就进行空间清理
        if self.sync_times > 100 then
            local values_original = table.clone(self.values)
            --  清理values
            table.clear(self.values)

            for i = 1, #values_original do
                local val = values_original[i]
                if val then
                    table.insert(self.values, val)
                end
            end
            --  清理values_hash
            table.clear(self.values_hash)
            log.info("clear stale data in `values_hash` for key: ", key)

            for i = 1, #self.values do
                key = short_key(self, self.values[i].key)
                self.values_hash[key] = i
            end
            
            self.sync_times = 0
        end
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-01 20:41:02  更:2022-02-01 20:42:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:20:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码