import requests,json,sqlite3,uuid
import time
import datetime
from Jenkins_Api import jenkins_job_build
from send_email import send_mail_build
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36',
}
git_url='https://git.xxx.com/'
git_token='xxxxxxxxxxxxxx'
mailto_list=['xxxxxxxxx@163.com','xxxxxxx@qq.com']
session = requests.Session()
headers['PRIVATE-TOKEN']=git_token
session.headers = headers
git_login=session.get(git_url,headers=headers)
def gitlab_project_commits():
project_branchs_commit_api = git_url + '/api/v4/projects/25/repository/commits?per_page=1&ref_name=test'
project_branchs_commit_headers = session.head(project_branchs_commit_api).headers
project_branchs_commit_api_page=project_branchs_commit_api+'&page=1'
project_branchs = session.get(project_branchs_commit_api_page).text
project_branchs_commit_json = json.loads(project_branchs)
print(project_branchs_commit_json[0])
project_branch_commit_json = project_branchs_commit_json[0]
print("分支信息获取完成")
return project_branch_commit_json
def push_jenkins(job_name,branch):
job = jenkins_job_build()
item_numb=job.build_jenkins_job(job_name,{"branchName":branch})
get_build_number = job.build_number(item_numb)
while job.job_is_building(job_name, get_build_number) is True:
time.sleep(10)
get_build_status = job.get_job_build_status(job_name,get_build_number)
return get_build_status
def send_email(email,title,content):
mailto_list.append(email)
if send_mail_build().send_mail(mailto_list,title,content):
print("send email done!")
return True
else:
print("send email failed!")
return False
if __name__ == "__main__":
last_commit_id = ""
while True:
time.sleep(10)
project_branch_commit_json = gitlab_project_commits()
current_commit_id = project_branch_commit_json['id']
if current_commit_id != last_commit_id:
commit_create_at=project_branch_commit_json['created_at']
commit_create_at= commit_create_at[0:19].replace("T"," ")
date_time = datetime.datetime.strptime(commit_create_at,'%Y-%m-%d %H:%M:%S')
date_time += datetime.timedelta(minutes=5)
sleep_time = (date_time - datetime.datetime.now()).total_seconds()
if sleep_time > 0:
time.sleep(sleep_time)
pjr = push_jenkins("job_name","test")
project_branch_commit_json["push_jenkins_result"] = pjr
ses = send_email(project_branch_commit_json['author_email'],"【Jenkins】自动部署通知",str(project_branch_commit_json));
print("休眠后执行",ses)
else:
pjr = push_jenkins("job_name","test")
project_branch_commit_json["push_jenkins_result"] = pjr
ses = send_email(project_branch_commit_json['author_email'],"【Jenkins】自动部署通知",str(project_branch_commit_json));
print("直接执行",ses)
last_commit_id = current_commit_id
print("发布完成")
print("未发现新的提交")
jenkins_job_build
import time
import jenkins
jenkins_server_url = 'http://jenkins.xxxx.com/'
user_id = 'admin'
api_token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
class jenkins_job_build(object):
def __init__(self):
self.server = jenkins.Jenkins(jenkins_server_url, username=user_id, password=api_token)
def build_jenkins_job(self, job_name,parameters=None):
item_number = self.server.build_job(job_name,parameters=parameters)
return item_number
def build_number(self,item_number):
while True:
time.sleep(1)
build_info = self.server.get_queue_item(item_number)
if 'executable' in build_info:
build_number =build_info['executable']["number"]
return build_number
def get_job_lastBuild_number(self, job_name):
lastBuild_number = self.server.get_job_info(job_name)['lastBuild']['number']
return lastBuild_number
def job_is_building(self, job_name, build_number):
is_building = self.server.get_build_info(job_name, build_number)['building']
return is_building
def get_job_build_status(self, job_name, build_number):
job_status = self.server.get_build_info(job_name, build_number)['result']
return job_status
def get_all_building_jobs(self):
building_jobs = self.server.get_running_builds()
return building_jobs
def get_all_jobs(self):
jobs = self.server.get_jobs()
return jobs
def get_build_job_log(self,job_name,job_number):
return self.server.get_build_console_output(job_name,job_number)
send_mail_build
import smtplib
from email.mime.text import MIMEText
mailto_list=['xxxxxx@163.com','xxxxx@qq.com']
mail_host="smtp.163.com"
mail_user="xxxxxx"
mail_pass="xxxxxx"
mail_postfix="163.com"
class send_mail_build(object):
def __init__(self):
self.server = smtplib.SMTP()
self.server.connect(mail_host)
self.server.login(mail_user,mail_pass)
def send_mail(self,to_list,sub,content):
me="jenkins auto push"+"<"+mail_user+"@"+mail_postfix+">"
msg = MIMEText(content,_subtype='plain')
msg['Subject'] = sub
msg['From'] = me
msg['To'] = ";".join(to_list)
try:
self.server.sendmail(me, to_list, msg.as_string())
self.server.close()
return True
except Exception as e:
self.server.close()
print(str(e))
return False
|