python 调用 ansible api
1.安装
pip install ansible
注意:ansible 只能Linux运行 可以尝试pycharm 远程调试
2.调用方式
1.adhoc 直接调用ansible模块
MODULE_PATH = "//home//production//env//devops//lib//python3.7//site-packages//ansible//modules"
def adhoc(host_list, task_list, remote_user='root', become=None, become_user=None):
context.CLIARGS = ImmutableDict(connection='smart', remote_user=remote_user, verbosity=3, module_path=MODULE_PATH,
forks=20, become=become,
become_method=None, become_user=become_user, check=False, diff=False)
sources = ','.join(host_list)
if len(host_list) == 1:
sources += ','
loader = DataLoader()
passwords = dict()
results_callback = ResultsCollectorJSONCallback()
inventory = InventoryManager(loader=loader, sources=sources)
variable_manager = VariableManager(loader=loader, inventory=inventory)
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords=passwords,
stdout_callback=results_callback,
)
play_source = dict(
name="Ansible Play",
hosts=host_list,
gather_facts='no',
tasks=task_list
)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
try:
result = tqm.run(play)
except Exception as e:
return e
finally:
tqm.cleanup()
if loader:
loader.cleanup_all_tmp_files()
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
def get_result():
results_raw = {'success': {}, 'failed': {}, 'unreachable': {}}
for _hosts, result in results_callback.host_ok.items():
results_raw['success'][_hosts] = result._result
for _hosts, result in results_callback.host_failed.items():
results_raw['failed'][_hosts] = result._result
for _hosts, result in results_callback.host_unreachable.items():
results_raw['unreachable'][_hosts] = result._result
return results_raw
_get_result = get_result()
return _get_result
2.playbook 调用yaml脚本
def execPlaybook(playbooks,host_list, remote_user='production', become=None, become_user=None):
context.CLIARGS = ImmutableDict(connection='smart', remote_user=remote_user, verbosity=3, module_path=MODULE_PATH,
forks=20, become=become, syntax=None, start_at_task=None,
become_method=None, become_user=become_user, check=False, diff=False)
sources = ','.join(host_list)
if len(host_list) == 1:
sources += ','
loader = DataLoader()
passwords = dict()
results_callback = ResultsCollectorJSONCallback()
inventory = InventoryManager(loader=loader, sources=sources)
variable_manager = VariableManager(loader=loader, inventory=inventory)
playbook = PlaybookExecutor(playbooks=playbooks,
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords=passwords)
playbook._tqm._stdout_callback = results_callback
try:
result = playbook.run()
except Exception as e:
return e
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
def get_result():
results_raw = {'success': {}, 'failed': {}, 'unreachable': {}}
for _hosts, result in results_callback.host_ok.items():
results_raw['success'][_hosts] = result._result
for _hosts, result in results_callback.host_failed.items():
results_raw['failed'][_hosts] = result._result
for _hosts, result in results_callback.host_unreachable.items():
results_raw['unreachable'][_hosts] = result._result
return results_raw
_get_result = get_result()
return _get_result
3.设置回调方法
class ResultsCollectorJSONCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in.
If you want to collect all results into a single object for processing at
the end of the execution, look into utilizing the ``json`` callback plugin
or writing your own custom callback plugin.
"""
def __init__(self, *args, **kwargs):
super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}
def v2_runner_on_unreachable(self, result):
host = result._host
self.host_unreachable[result._host.get_name()] = {
"unreachable": result._result.get("unreachable"),
"msg": result._result.get("msg")}
def v2_runner_on_ok(self, result, *args, **kwargs):
"""Print a json representation of the result.
Also, store the result in an instance attribute for retrieval later
"""
self.host_ok[result._host.get_name()] = {
"stdout_lines": result._result.get("stdout_lines"),
"stderr_lines": result._result.get("stderr_lines"),
"cmd": result._result.get("cmd"),
"delta": result._result.get("delta"),
"start": result._result.get("start"),
'end': result._result.get("end"),
"rc": result._result.get("rc"),
"changed": result._result.get("changed")}
def v2_runner_on_failed(self, result, *args, **kwargs):
self.host_failed[result._host.get_name()] = {
"stdout_lines": result._result.get("stdout_lines"),
"stderr_lines": result._result.get("stderr_lines"),
"cmd": result._result.get("cmd"),
"delta": result._result.get("delta"),
"start": result._result.get("start"),
"msg": result._result.get("msg"),
'end': result._result.get("end"),
"rc": result._result.get("rc"),
"changed": result._result.get("changed")
}
4.main方法使用
if __name__ == '__main__':
host=['10.0.0.0','10.0.0.0']
tasks=[
dict(name='222222',action=dict(module="setup",args=dict(filter='ansible_nodename'))),
result = adhoc(host_list=host,task_list=tasks)
print(json.dumps(result, indent=4))
|