提交 d6410378 作者: LiuLiYuan

微信公众号 06/28

上级 29d5214b
# 微信采集列表数据
import json
import time
import random
import urllib.parse
import pymysql
import redis
import requests
import urllib3
from pymysql.converters import escape_string
import sys
# sys.path.append('D:\\zzsn\\base')
from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
s = requests.session()
cnx = pymysql.connect(host="114.116.44.11", user="caiji", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
r = baseCore.r
def resHtml(token, url, cookies):
try:
ip = baseCore.get_proxy()
s = requests.session()
cookie_jar = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
s.cookies = cookie_jar
# json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
aa = s.cookies.get_dict()
updateCookieToken(token, json.dumps(aa))
except Exception as e:
json_search = {}
return json_search
# 采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
# 获取公众号信息
def getSourceInfo(infoSourceCode):
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
cursor.execute(sql)
row = cursor.fetchone()
cnx.commit()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
error = [
origin,
url_,
info_source_code,
e,
'biz错误'
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return False
return dic_url
# 保存错误日志
def insertBadSql(error):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
# token的处理
def updateTokeen(token, type):
if type == 2:
# session失效,删除token
cursor_.execute(f"delete from weixin_tokenCookies where token={token}")
if type == 1:
# 封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set fenghao_time=now() where token={token}")
if type == 3:
# 封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set update_time=now() where token={token}")
cnx_.commit()
# token的处理
def updateCookieToken(token, cookies):
cursor_.execute(f"update weixin_tokenCookies set cookies='{escape_string(cookies)}' where token={token}")
cnx_.commit()
# 获取token
def getToken():
cursor_.execute(
f"select token, cookies, user_name from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
row = cursor_.fetchall()
cnx_.commit()
if row:
pass
else:
# 没有查到token
log.info("没有拿到token")
return False
return row[0]
# 获取微信公众号数据
def getWxList(infoSourceCode, dic_user_count):
dic_url = getSourceInfo(infoSourceCode)
log.info(f"======{infoSourceCode}----开始采集=======")
if dic_url:
pass
else:
log.info(f'======{infoSourceCode}---------该公众号biz错误,请检查=======')
error = ['', '', infoSourceCode, '', '该公众号biz错误']
insertBadSql(error)
return
origin = dic_url['name']
biz = dic_url['biz']
dic_user_count = spider(dic_url, dic_user_count)
log.info(f"======{origin}-----{biz}----结束采集=======")
return dic_user_count
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
cnx.commit()
result_list = [item[0] for item in results]
time.sleep(20)
# 放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
# 获取redis中的公众号个数
def getnumber_redis():
length = r.llen('WeiXinGZH:infoSourceCode')
return length
def insertWxList(dic_url, news_title, url_news, news_date):
selectCountSql = f"select count(1) from wx_link where link='{escape_string(url_news)}'"
cursor_.execute(selectCountSql)
count = cursor_.fetchone()[0]
if count > 0:
# 表示已存在
return 0
else:
try:
insertSql = f"insert into wx_link(sid,site_uri,site_name,info_source_code,title,publish_time,link,state,create_time) values " \
f"('{dic_url['sid']}','{dic_url['url_']}','{dic_url['name']}','{dic_url['info_source_code']}','{escape_string(news_title)}','{escape_string(news_date)}','{escape_string(url_news)}',0,now())"
cursor_.execute(insertSql)
cnx_.commit()
except Exception as e:
log.error(f"保存数据库失败:{e}")
# 查询放入之后的id
selectIdSql = f"select id from wx_link where sid='{dic_url['sid']}' and link='{escape_string(url_news)}'"
cursor_.execute(selectIdSql)
linkid = cursor_.fetchone()[0]
# todo: 放入redis
try:
r.ping()
except:
r_ = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
r_.lpush('WeiXinGZH:url', linkid)
r.lpush('WeiXinGZH:linkid', linkid)
# 表示插入数据库成功
return 1
def spider(dic_url, dic_user_count):
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
biz = dic_url['biz']
fakeid = biz + '=='
linke_list = []
begin = 0
flgB = False
while True:
tokenAndCookie = getToken()
if tokenAndCookie:
pass
else:
log.info("没有拿到token,开始递归")
while True:
log.info("没有拿到token,开始休眠")
time.sleep(60)
log.info("没有拿到token,结束休眠")
tokenAndCookie = getToken()
if tokenAndCookie:
break
user_name = tokenAndCookie[2]
token = tokenAndCookie[0]
log.info(f"获取token到----{token}----{user_name}")
cookies = json.loads(tokenAndCookie[1])
url = f'https://mp.weixin.qq.com/cgi-bin/appmsgpublish?sub=list&search_field=null&begin={begin}&count=5&query=&fakeid={urllib.parse.quote(fakeid)}&type=101_1&free_publish_type=1&sub_action=list_ex&token={token}&lang=zh_CN&f=json&ajax=1'
json_search = resHtml(token, url, cookies)
ret = json_search['base_resp']['ret']
str_t = json.dumps(json_search)
if ret == 0:
if user_name in dic_user_count:
dic_user_count[user_name] += 1
else:
dic_user_count[user_name] = 1
elif ret == 200013:
log.info(f'======{origin}-----{biz}----{user_name}账号被封=======')
updateTokeen(token, 1)
time.sleep(5)
continue
elif ret == 200002:
log.info(f'======{origin}-----{biz}----该公众号号biz错误,请检查=======')
error = [origin, url_, info_source_code, str_t, '无效biz参数']
insertBadSql(error)
time.sleep(5)
continue
elif ret == 200003:
log.info(f'======{origin}-----{biz}----{user_name}账号无效session=======')
# session失效修改token
updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, '无效session']
insertBadSql(error)
time.sleep(5)
continue
elif ret == 200074:
# {"base_resp": {"ret": 200074, "err_msg": "default"}}
log.info(f'======{origin}-----{biz}----{user_name}账号未登录成功=======')
# session失效修改token
updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, f'{user_name}账号未登录成功']
insertBadSql(error)
time.sleep(5)
continue
else:
log.info(f'======{origin}-----{biz}----{user_name}账号其他错误=======')
error = [origin, url_, info_source_code, str_t, '其他错误']
insertBadSql(error)
updateTokeen(token, 2)
time.sleep(5)
continue
data_json = json_search['publish_page']
data_json = json.loads(data_json)
total_count = data_json['total_count']
publish_list = data_json['publish_list']
for publish_data in publish_list:
publish_info = publish_data['publish_info']
publish_info = json.loads(publish_info)
appmsgex = publish_info['appmsgex']
for msgex in appmsgex:
title = msgex['title']
link = msgex['link']
publish_date = msgex['update_time']
publish_date = time.localtime(publish_date)
publish_date = time.strftime('%Y-%m-%d %H:%M:%S', publish_date)
if link not in linke_list:
linke_list.append(link)
flgA = insertWxList(dic_url, title, link, publish_date)
if flgA == 0:
flgB = True
break
if flgB:
break
if len(linke_list) >= total_count:
break
begin += 5
time.sleep(20)
return dic_user_count
if __name__ == "__main__":
# getFromSql()
numbers = getnumber_redis()
log.info("当前批次采集公众号个数{}".format(numbers))
time.sleep(3)
dic_user_count = {}
start = time.time()
log.info(f"开始时间{baseCore.getNowTime(1)}")
while True:
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None' or infoSourceCode == None:
log.info("redis已经没有数据了,需要重新放置数据")
log.info(f"采集完一轮公众号耗时{baseCore.getTimeCost(start, time.time())}")
getFromSql()
break
dic_user_count = getWxList(infoSourceCode, dic_user_count)
if dic_user_count:
for key, value in dic_user_count.items():
log.info(f"====账号{key},使用次数{value}")
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论