现在很多的app都很喜欢在微信或者支付宝的小程序内做开发,毕竟比较方便、安全、有流量、不需要再次下载app,好多人会因为加入你让他下载app他会扭头就走不用你的app,毕竟做类似产品的不是你一家。
之前做过很多微信小程序的爬虫任务,今天做下记录,防止很久不用后就会忘记,微信小程序分为两大类:
1、是不需要登录的(这种的话不做分析,毕竟没什么反爬)
2、需要登录的
2.1 登录一次之后token永久有效
2.2 登录一次token几分钟内到几小时内失效
2.2.1 登录后一段时间后token时候需要再次调用微信内部方法生成code去换取token(本次主要做的)
2.2.2 跟2.2.1类似,然后又加了一道校验,比如图片验证码,这个类似于微信公众号的茅台预约那种(本次不做分析)
微信小程序的登录其实跟其他的web登录不太一样,一般的web登录或者是app登录基本上就是用户名+密码+验证码(图片或者短信)就可以,微信的逻辑是假如你需要登录的话需要获得用户的授权,之后调用微信的内部方法生成一个code,code只能用一次之后就实效,微信解释这个code有效期是5分钟左右。
这里是具体流程:https://developers.weixin.qq.com/community/develop/doc/000c2424654c40bd9c960e71e5b009?highLine=code
之前爬取过的一个小程序他的反爬是token有效期一个小时,然后单次token可用大概100次左右,当单个token使用次数或者单小时内使用次数超过100次就直接封号处理,24小时内也有频率控制,所以就需要我每小时一次每小时一次的去获取token,当然,因为我是个程序猿,所以我不能每小时手动的去获取这个token,比较这不是我们的风格。
这里需要的是python+fiddler+appium+模拟器,大致的思路是通过appium去操控模拟器模拟点击微信的小程序,定期的去做点击,然后fiddler去从请求的头部信息中获取到token,之后写到本地文件中,然后python程序定时的去判断这个本地文件是否进行了更新,更新了的话通过正则来获取到token_list之后去最后一个,因为有可能是当前保存的token已经失效了,小程序还会再次去拿这个token尝试请求一下,假如失效了会调用微信的内部方法生成code来换取token,我这里的爬虫主代码是运行在服务器的,所有又增加了Redis来存储token。
一、微信模拟点击
微信按照需求条件时间频率模拟点击、滑动、退出等操作,以下的ding_talk的send_msg是增加的钉钉发送消息,此处不再添加,有需求的可以自己查看钉钉机器人文档或者依据自己的需求调整自己的消息提醒。
import time
import logging
from appium import webdriver
from ding_talk import send_msg
from handle_file import EnToken
from conf.dbr import RedisClient
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from config import *
LOG_FORMAT = "%(asctime)s - %(levelname)s - line:%(lineno)s - msg:%(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
# logging.FileHandler(filename='app.log', encoding='utf-8')
# 微信获取en token
class WeChat(object):
def __init__(self):
"""
初始化
"""
# 驱动配置
self.desired_caps = {
'platformName': PLATFORM,
'deviceName': DEVICE_NAME,
'appPackage': APP_PACKAGE,
'appActivity': APP_ACTIVITY,
'noReset': True
}
self.driver = webdriver.Remote(DRIVER_SERVER, self.desired_caps)
self.wait = WebDriverWait(self.driver, TIMEOUT)
self.hours_en = 60 * 60 * 1.1 # en控制1.1小时模拟点击一次
self.date_start_en = time.time() # en开始时间
self.date_end_en = 0 # en超过此时间后再次运行
# self.date_end_en = self.date_start_en + self.hours_en # en超过此时间后再次运行
self.week = 60 * 60 * 24 * 7 # 按照周的频率对xd进行token更新
self.week_start_xd = time.time() # xd的开始时间
self.week_end_xd = 0 # 根据周控制频率控制再次开启时间
self.week_start_xiu = time.time() # xd的开始时间
self.week_end_xiu = 0 # 根据周控制频率控制再次开启时间
def login(self):
"""
登录微信
:return:
"""
# 登录按钮
a = time.time()
try:
login = self.wait.until(EC.presence_of_element_located((By.ID, 'com.tencent.mm:id/f34')))
login.click()
except Exception as e:
# print(e)
logging.info(f'failed login {e}')
b = time.time() - a
# print('点击登录', b)
logging.info(f'click login,use time {b}')
# 手机输入
try:
phone = self.wait.until(EC.presence_of_element_located((By.ID, 'com.tencent.mm:id/bem')))
phone.set_text(USERNAME)
except Exception as e:
# print(e)
logging.info(f'something wrong{e}')
c = time.time() - a - b
# print('手机号输入', c)
logging.info(f'send keys phone nums use time {c}')
# 下一步
try:
next = self.wait.until(EC.element_to_be_clickable((By.ID, 'com.tencent.mm:id/dw1')))
next.click()
except Exception as e:
logging.info(f'something wrong{e}')
d = time.time() - a - b - c
logging.info(f'click next bottom use time {c}')
# 密码
password = self.wait.until(EC.presence_of_element_located((By.XPATH, '//*[@text="请填写微信密码"]')))
password.set_text(PASSWORD)
e = time.time() - a - b - c - d
logging.info(f'send keys password use time {e}')
# 提交
# submit = self.wait.until(EC.element_to_be_clickable((By.ID, 'com.tencent.mm:id/dw1')))
submit = self.wait.until(EC.element_to_be_clickable((By.XPATH, '//*[@text="登录"]')))
submit.click()
f = time.time() - a - b - c - d - e
logging.info(f'commit password use time {f}')
def run(self):
"""
入口
:return:
"""
# 滑动之后等待出现en小程序
self.slide_down()
time.sleep(10)
# 点击进入en小程序
self.touch_en()
if self.week_end_xd < self.week_start_xd:
self.week_start_xd = time.time()
self.week_end_xd = self.week_start_xd + self.week
print('xd点击')
self.touch_xd()
elif self.week_end_xiu < self.week_start_xiu:
self.week_end_xiu = time.time() + self.week
print('xiu')
self.touch_xiu()
time.sleep(10)
# 退出小程序
self.driver_closed()
print('driver closed')
emt = EnToken()
token_res = emt.token_2_redis()
if not token_res:
print('需要发送失败消息')
return False
return True
def slide_down(self):
"""
滑动微信屏幕之后点击小程序
:return:
"""
window_size_phone = self.driver.get_window_size()
# print(window_size_phone)
phone_width = window_size_phone.get('width')
phone_height = window_size_phone.get('height')
# print(phone_width, phone_height)
time.sleep(15)
x1 = phone_width * 0.5
y1 = phone_height * 0.7
y2 = phone_height * 0.26
# print('准备向下滑动')
logging.info(f'prepare slide down')
a = time.time()
self.driver.swipe(x1, y2, x1, y1, 2050)
# print('向下滑动完成', time.time() - a)
logging.info(f'slide down success use time {time.time() - a}')
def touch_en(self):
"""
每次进来之后都需要判断是否到了时间,若时间到了之后才可执行点击操作
:param : en 代表en; xd 代表xd; xiu 代表xiu.
:return: None 无返回值
"""
print(self.date_end_en, time.time())
if self.date_end_en < time.time(): # 此时的时候已经超时,需要再次从新进行点击
print('en模拟点击')
# 从新定义开始结束时间
print(self.date_end_en, time.time())
self.date_end_en = time.time() + self.hours_en # 再次更改end time为n小时后
print(self.date_end_en, time.time())
try:
# print('id定位en')
en_app = self.wait.until(
EC.presence_of_element_located((By.XPATH, f"//android.widget.TextView[@text='textname…']")))
# en_master = self.wait.until(EC.presence_of_element_located((By.ID, 'com.tencent.mm:id/hu')))
# en_master = self.wait.until(
# EC.presence_of_element_located((By.XPATH, "//android.widget.TextView[@text='textname']")))
en_app.click()
logging.info(f'located by app_name en')
except Exception as error:
# print(e, 'id定位失败')
logging.info(f'failed located by id:{error}')
time.sleep(20)
# 关闭小程序按钮点击
print('close the en app')
close_button = self.wait.until(EC.presence_of_element_located((By.XPATH, f"//android.widget.FrameLayout[2]/android.widget.ImageButton")))
close_button.click()
print('点击了关闭小程序')
def touch_xd(self):
"""
需要考虑是否已经登录状态还是需要再次登录
:return:
"""
# 点击后进入到小程序
logging.info('click app xd')
xd_app = self.wait.until(EC.presence_of_element_located((By.XPATH, "//android.widget.TextView[@text='textname']")))
xd_app.click()
time.sleep(20)
# 页面出现需要获取到你的定位的时候需要点击允许
print('点击确认获取当前位置')
self.driver.tap([(510, 679)], 500)
# 点击进入到个人中心
time.sleep(10)
logging.info('click personal xd')
self.driver.tap([(540, 1154)], 500)
# 点击快速登录进行登录
time.sleep(10)
logging.info('click login xd')
self.driver.tap([(270, 1030)], 500)
# 点击同意获取头像信息
time.sleep(10)
logging.info('同意获取头像等相关信息')
self.driver.tap([(510, 775)], 500)
time.sleep(20)
# 关闭小程序按钮点击
print('close the guaishou app')
close_button = self.wait.until(
EC.presence_of_element_located((By.XPATH, f"//android.widget.FrameLayout[2]/android.widget.ImageButton")))
close_button.click()
print('结束')
time.sleep(30)
def touch_xiu(self):
"""
xiu模拟点击,需要考虑是否需要登录状态下
:return:
"""
# 点击后进入到小程序
logging.info('click app xiu')
xiu_app = self.wait.until(EC.presence_of_element_located((By.XPATH, "//android.widget.TextView[@text='xiu']")))
xiu_app.click()
# 若页面显示需要确认获取当前位置的话需要点击确认
logging.info('click confirm xiu')
time.sleep(15)
confirm_loc = self.wait.until(
EC.presence_of_element_located((By.XPATH, "//android.widget.Button[@text='确定']")))
confirm_loc.click()
# 点击个人中心
logging.info('click personal xiu')
time.sleep(5)
try:
personal = self.wait.until(
EC.presence_of_element_located((By.XPATH, "//android.view.View[@content-desc='个人中心']")))
personal.click()
except Exception as e:
print(e)
# 点击快速登录进行登录
logging.info('click login xiu')
time.sleep(5)
try:
login = self.wait.until(EC.presence_of_element_located((By.XPATH, "//android.view.View[@content-desc='立即登录']")))
login.click()
except Exception as e:
print('xiu已经登录,不需要再次点击确认登录')
time.sleep(30)
def driver_closed(self):
self.driver.quit()
if __name__ == '__main__':
conn_r = RedisClient(db=10)
count_1 = 0
# start_time = time.time()
# end_time = time.time() + 60 * 60 * 1
we_chat = WeChat()
try:
while 1:
if conn_r.r_size() < 3: # 监控Redis情况,当Redis中无数据后开始运行一次
res = we_chat.run() # 操作微信做操作点击en小程序生成token
if not res:
count_1 += 1
if count_1 > 10:
break # 当失败十次之后跳出循环
# 此处增加限制,每次生成token之后一个小时后才会产生新的token,防止一个token多次使用导致被封号
time.sleep(60*60)
else:
time.sleep(60*60) # 当有数据的时候等待五分钟
we_chat.driver = webdriver.Remote(DRIVER_SERVER, we_chat.desired_caps)
we_chat.wait = WebDriverWait(we_chat.driver, TIMEOUT)
except Exception as e:
msg = f'业务报警:' \
f'\n en获取token出现问题' \
f'\n{e}'
send_msg(msg)
# print(e, type(e))
logging.info(msg)
weixin.py
import os
# 平台
PLATFORM = 'Android'
# 设备名称 通过 adb devices -l 获取
DEVICE_NAME = 'MI_9'
# APP路径
APP = os.path.abspath('.') + '/weixin.apk'
# APP包名
APP_PACKAGE = 'com.tencent.mm'
# 入口类名
APP_ACTIVITY = '.ui.LauncherUI'
# Appium地址
DRIVER_SERVER = 'http://localhost:4723/wd/hub'
# 等待元素加载时间
TIMEOUT = 10
# 微信手机号密码
USERNAME = 'wechatname'
PASSWORD = 'wechatpwd'
# 滑动点
FLICK_START_X = 300
FLICK_START_Y = 300
FLICK_DISTANCE = 700
config.py
以下是处理文件,将token获取到后放到Redis中,或者你可以依照你的想法调整
import re
import os
import logging
from conf.dbr import RedisClient
LOG_FORMAT = "%(asctime)s - %(levelname)s - line:%(lineno)s - msg:%(message)s"
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
# 处理en token到Redis
class EnToken(object):
def __init__(self):
# self.token_path = 'F:\\en.txt'
# self.token_path = 'F:\\xiu.txt'
# self.token_path = 'F:\\xd.txt'
self.conn = RedisClient(db=10) # 解析日维度价格
self.conn_en = RedisClient(db=9) # 解析当前经纬度范围内店铺点位
# 处理en token文件,从文件中读取到token之后只取最后一个,取到之后删除本地文件
@staticmethod
def handle_en_txt():
token_dict = {}
path_token_list = [
('en', '>(e.*?)-->'),
('xd', 'headers-->(.*?)-->'),
('xiu', r'>(\d+)-->'),
]
for i in path_token_list:
token_path = f'F:\\{i[0]}.txt'
token_re = i[-1]
if os.path.exists(token_path):
with open(token_path, mode='r', encoding='utf-8') as f:
token_str = f.read()
# print(token_str)
# token_list = re.findall('>(e.*?)-->', token_str)
# token_list = re.findall('>(Q.*?)-->', token_str)
# token_list = re.findall('>(\d+)-->', token_str)
token_list = re.findall(token_re, token_str)
print(token_list)
if token_list:
token = token_list[-1]
print(token)
token_dict[i[0]] = token
os.remove(token_path) # 删除掉
# return token
else:
# print('file_en_dont_exit')
logging.info('file_en_dont_exit')
return token_dict
# 将token放到Redis中
def token_2_redis(self):
"""
假如token存在的话 则根据token的最后几位做key放入到Redis中
:return:
"""
token_dict = self.handle_en_txt()
print(token_dict)
if token_dict:
for token_items in token_dict.items():
token_key = token_items[0]
token_val = token_items[-1]
self.conn.set(token_key, token_val, over_time=None)
# self.conn.set(token_key, token, over_time=60*65) # 设置有效时长65分钟之后失效
# self.conn_en.set(token_key, token, over_time=60*65) # 设置有效时长65分钟之后失效
logging.info(f'token success {token_key,token_val}')
return True
else:
logging.info('token dons"t exist')
self.conn.close()
self.conn_en.close()
if __name__ == '__main__':
en = EnToken()
en.token_2_redis()
handle_file.py
二、配置fiddler获取请求头的信息写到本地文件
修改fiddlerscript添加以下内容,在做数据请求的以下增加下面内容
if (oSession.oRequest["Host"]=="这里是请求的host") {
var filename = "F:\en.txt";
var curDate = new Date();
var logContent = 'en' + "[" + curDate.toLocaleString() + "]";
var sw : System.IO.StreamWriter;
if (System.IO.File.Exists(filename)){
sw = System.IO.File.AppendText(filename);
sw.Write(logContent + 'oSession.oRequest.headers-->' + oSession.oRequest.headers['x-wx-token'] + '-->' + oSession.oRequest.headers +'\n');
// sw.Write("Request header:" + "\n" + oSession.oRequest.headers);
// sw.Write(wap_s + '\n\n')
}
else{
sw = System.IO.File.CreateText(filename);
sw.Write(logContent + 'oSession.oRequest.headers-->' + oSession.oRequest.headers['x-wx-token'] + '-->' + '\n');
// sw.Write("Request header:" + "\n" + oSession.oRequest.headers);
// sw.Write(wap_s + '\n\n')
}
sw.Close();
sw.Dispose();
}
fiddler
三、主爬虫业务代码
此处按照自己的需求逻辑调整自己的业务代码。
如果对你有所帮助就请作者喝杯咖啡吧😊
?本文章同步发表于博客园
|