方法介绍
local user_routes, err = core.config.new("/routes", {
automatic = true,
item_schema = core.schema.route,
checker = check_route,
filter = filter,
})
if not user_routes then
error("failed to create etcd instance for fetching /routes : " .. err)
end
从字面上很容易误导成是单纯的new个对象信息出来
源码解析
new(key,opts)
function _M.new(key, opts)
local local_conf, err = config_local.local_conf()
if not local_conf then
return nil, err
end
local etcd_conf = local_conf.etcd
local prefix = etcd_conf.prefix
local resync_delay = etcd_conf.resync_delay
if not resync_delay or resync_delay < 0 then
resync_delay = 5
end
local health_check_timeout = etcd_conf.health_check_timeout
if not health_check_timeout or health_check_timeout < 0 then
health_check_timeout = 10
end
local automatic = opts and opts.automatic
local item_schema = opts and opts.item_schema
local filter_fun = opts and opts.filter
local timeout = opts and opts.timeout
local single_item = opts and opts.single_item
local checker = opts and opts.checker
local obj = setmetatable({
etcd_cli = nil,
key = key and prefix .. key,
automatic = automatic,
item_schema = item_schema,
checker = checker,
sync_times = 0,
running = true,
conf_version = 0,
values = nil,
need_reload = true,
routes_hash = nil,
prev_index = 0,
last_err = nil,
last_err_time = nil,
resync_delay = resync_delay,
health_check_timeout = health_check_timeout,
timeout = timeout,
single_item = single_item,
filter = filter_fun,
}, mt)
if automatic then
if not key then
return nil, "missing `key` argument"
end
if loaded_configuration[key] then
local res = loaded_configuration[key]
loaded_configuration[key] = nil
log.notice("use loaded configuration ", key)
local dir_res, headers = res.body, res.headers
load_full_data(obj, dir_res, headers)
end
ngx_timer_at(0, _automatic_fetch, obj)
else
local etcd_cli, err = get_etcd()
if not etcd_cli then
return nil, "failed to start a etcd instance: " .. err
end
obj.etcd_cli = etcd_cli
end
if key then
created_obj[key] = obj
end
return obj
end
load_full_data(self, dir_res, headers)
self.values = new_tab(#dir_res.nodes, 0)
self.values_hash = new_tab(0, #dir_res.nodes)
for _, item in ipairs(dir_res.nodes) do
local key = short_key(self, item.key)
local data_valid = true
if type(item.value) ~= "table" then
data_valid = false
log.error("invalid item data of [", self.key .. "/" .. key,
"], val: ", item.value,
", it should be an object")
end
if data_valid and self.item_schema then
data_valid, err = check_schema(self.item_schema, item.value)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.encode(item.value))
end
end
if data_valid and self.checker then
data_valid, err = self.checker(item.value)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item.value))
end
end
if data_valid then
changed = true
insert_tab(self.values, item)
self.values_hash[key] = #self.values
item.value.id = key
item.clean_handlers = {}
if self.filter then
self.filter(item)
end
end
self:upgrade_version(item.modifiedIndex)
end
.........
self.need_reload = false
ngx_timer_at
ngx_timer_at(0, _automatic_fetch, obj)
demo:
local delay = 5
local handler
handler = function (premature,param)
if premature then
return
end
ngx.log(ngx.ERR, "param is : ", param)
end
local ok, err = ngx.timer.at(delay, handler,"Hello world")
premature这个参数不是调用ngx.timer.at传入的参数,它是一个固定的参数,表示定时器是否过期; param才是调用时的额外参数
_automatic_fetch(premature, self)
......
if not self.etcd_cli then
local etcd_cli, err = get_etcd()
if not etcd_cli then
error("failed to create etcd instance for key ["
.. self.key .. "]: " .. (err or "unknown"))
end
self.etcd_cli = etcd_cli
end
local ok, err = sync_data(self)
.........
sync_data(self)
if self.need_reload then
local res, err = readdir(self.etcd_cli, self.key)
if not res then
return false, err
end
local dir_res, headers = res.body.node or {}, res.headers
log.debug("readdir key: ", self.key, " res: ",
json.delay_encode(dir_res))
if not dir_res then
return false, err
end
if self.values then
for i, val in ipairs(self.values) do
if val and val.clean_handlers then
for _, clean_handler in ipairs(val.clean_handlers) do
clean_handler(val)
end
val.clean_handlers = nil
end
end
self.values = nil
self.values_hash = nil
end
load_full_data(self, dir_res, headers)
return true
end
local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout)
log.info("waitdir key: ", self.key, " prev_index: ", self.prev_index + 1)
log.info("res: ", json.delay_encode(dir_res, true))
.........
if pre_index then
local pre_val = self.values[pre_index]
if pre_val and pre_val.clean_handlers then
for _, clean_handler in ipairs(pre_val.clean_handlers) do
clean_handler(pre_val)
end
pre_val.clean_handlers = nil
end
if res.value then
if not self.single_item then
res.value.id = key
end
self.values[pre_index] = res
res.clean_handlers = {}
log.info("update data by key: ", key)
else
self.sync_times = self.sync_times + 1
self.values[pre_index] = false
self.values_hash[key] = nil
log.info("delete data by key: ", key)
end
elseif res.value then
res.clean_handlers = {}
insert_tab(self.values, res)
self.values_hash[key] = #self.values
if not self.single_item then
res.value.id = key
end
log.info("insert data by key: ", key)
end
if self.sync_times > 100 then
local values_original = table.clone(self.values)
table.clear(self.values)
for i = 1, #values_original do
local val = values_original[i]
if val then
table.insert(self.values, val)
end
end
table.clear(self.values_hash)
log.info("clear stale data in `values_hash` for key: ", key)
for i = 1, #self.values do
key = short_key(self, self.values[i].key)
self.values_hash[key] = i
end
self.sync_times = 0
end
|