提交 8265a6f6 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

# 微信采集列表数据
import json
import time
import random
import pymysql
import requests
import urllib3
from pymysql.converters import escape_string
from base.BaseCore import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore()
log = baseCore.getLogger()
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
s = requests.session()
cnx = pymysql.connect(host="114.116.44.11", user="caiji", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
r = baseCore.r
def resHtml(token,url,cookies):
try:
ip = baseCore.get_proxy()
s=requests.session()
cookie_jar = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
s.cookies = cookie_jar
# json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
json_search = s.get(url, headers=headers, proxies=ip,verify=False).json()
aa=s.cookies.get_dict()
updateCookieToken(token, json.dumps(aa))
except Exception as e:
json_search= {}
return json_search
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
#获取公众号信息
def getSourceInfo(infoSourceCode):
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
cursor.execute(sql)
row = cursor.fetchone()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
error = [
origin,
url_,
info_source_code,
e,
'biz错误'
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return False
return dic_url
#保存错误日志
def insertBadSql(error):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
#保存文章列表数据
def insertWxList(dic_url,json_search,page):
list_all_news = json_search['app_msg_list']
listCount=0
repetCount=0
insertCount=0
for one_news in list_all_news:
listCount=listCount+1
news_title = one_news['title']
timestamp = one_news['create_time']
time_local = time.localtime(timestamp)
news_date = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
url_news = one_news['link']
selectCountSql=f"select count(1) from wx_link where link='{escape_string(url_news)}'"
cursor_.execute(selectCountSql)
count = cursor_.fetchone()[0]
if count > 0:
repetCount=repetCount+1
continue
else:
insertCount=insertCount+1
insertSql=f"insert into wx_link(sid,site_uri,site_name,info_source_code,title,publish_time,link,state,create_time) values " \
f"('{dic_url['sid']}','{dic_url['url_']}','{dic_url['name']}','{dic_url['info_source_code']}','{escape_string(news_title)}','{escape_string(news_date)}','{escape_string(url_news)}',0,now())"
cursor_.execute(insertSql)
cnx_.commit()
log.info(f"---{dic_url['name']}--第{page}页----总数:{listCount}---重复数:{repetCount}---新增数:{insertCount}-------------")
if listCount==0:
#列表为空认为结束
return True
if repetCount>= listCount/2:
#重复数量大于等于一半认为结束
return True
#没有结束
return False
#token的处理
def updateTokeen(token,type):
if type==2:
#session失效,删除token
cursor_.execute(f"delete from weixin_tokenCookies where token={token}")
if type ==1:
#封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set fenghao_time=now() where token={token}")
if type ==3:
#封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set update_time=now() where token={token}")
cnx_.commit()
#token的处理
def updateCookieToken(token,cookies):
cursor_.execute(f"update weixin_tokenCookies set cookies='{escape_string(cookies)}' where token={token}")
cnx_.commit()
#获取token
def getToken():
cursor_.execute(f"select token,cookies from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
row = cursor_.fetchall()
if row:
pass
else:
#没有查到token
return False
return row[0]
#获取列表数据
def getPageData(dic_url,page):
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
biz = dic_url['biz']
fakeid = biz + '=='
tokenAndCookie = getToken()
if tokenAndCookie:
pass
else:
while True:
time.sleep(60)
tokenAndCookie = getToken()
if tokenAndCookie:
break
token = tokenAndCookie[0]
log.info(f"获取token到----{token}")
cookies = json.loads(tokenAndCookie[1])
# s.cookies.update(cookies)
url = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={(page - 1) * 5}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# reponse = s.get(url, headers=headers, proxies=ip, verify=False)
# json_search = reponse.json()
# newcookies = requests.utils.dict_from_cookiejar(reponse.cookies, cookiejar=None, overwrite=True)
# s.cookies = newcookies
# updateCookieToken(token,json.dumps(s.cookies))
#调用方法
json_search=resHtml(token,url,cookies)
str_t = json.dumps(json_search)
ret = json_search['base_resp']['ret']
if ret == 0:
pass
elif ret == 200013:
log.info(f'======{origin}-----{biz}----该账号被封=======')
#封号修改token
updateTokeen(token,1)
return getPageData(dic_url,page)
elif ret == 200002:
log.info(f'======{origin}-----{biz}----该账号biz错误,请检查=======')
error = [origin, url_, info_source_code, str_t, '无效biz参数']
insertBadSql(error)
return True
elif ret == 200003:
log.info(f'======{origin}-----{biz}----该账号无效session=======')
# session失效修改token
updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, '无效session']
insertBadSql(error)
return getPageData(dic_url, page)
else:
log.info(f'======{origin}-----{biz}----该账号其他错误=======')
error = [origin, url_, info_source_code, str_t, '其他错误']
insertBadSql(error)
return True
# 修改token使用时间
updateTokeen(token, 3)
# 保存数据到数据库
return insertWxList(dic_url,json_search,page)
#获取微信公众号数据
def getWxList(infoSourceCode):
dic_url = getSourceInfo(infoSourceCode)
origin = dic_url['name']
biz = dic_url['biz']
log.info(f"======{infoSourceCode}----开始采集=======")
if dic_url:
pass
else:
log.info(f'======{infoSourceCode}---------该账号biz错误,请检查=======')
error = ['', '', infoSourceCode, '', '该账号biz错误']
insertBadSql(error)
return
for page in range(1,11):
retFlag = getPageData(dic_url, page)
time.sleep(random.randint(60,181))
if retFlag:
#结束 跳出该公众号
break
else:
#没有结束
pass
log.info(f"======{origin}-----{biz}----结束采集=======")
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
result_list = [item[0] for item in results]
#放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
if __name__=="__main__":
while True:
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None' or infoSourceCode == None:
log.info("redis已经没有数据了,重新放置数据")
getFromSql()
time.sleep(10)
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
getWxList(infoSourceCode)
# infoSourceCode = 'IN-20220917-0159'
# getWxList(infoSourceCode)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论