提交 a285ef8d 作者: 薛凌堃

微信公众号

上级 1d84da3a
......@@ -212,76 +212,30 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
continue
return list_all_info,num_caiji
def job(count,key):
# 刷新浏览器并获取当前token和cookie
token, cookies = flushAndGetToken(list_b)
log.info('===========获取公众号============')
def RequestUrl(dic_url,token,key):
start_ = time.time()
#todo:redis中数据 pop一条
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None' or infoSourceCode == None:
#当一次采集完之后,重新插入数据并等待插入完成
getFromSql()
time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return count
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
# sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20220609-57436' "
cursor.execute(sql)
row = cursor.fetchone()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
log.info('===========获取biz==========')
s.cookies.update(cookies)
s.keep_alive = False
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
e,
'biz错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return count
biz = dic_url['biz']
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
#获取页数
ret = -1
json_search = ''
# 获取页数
try:
# ip = baseCore.get_proxy()
json_search = s.get(url_search, headers=headers,
verify=False).json() # , proxies=ip, verify=False
verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(1)
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
time.sleep(20)
return count
return json_search,ret
ret = json_search['base_resp']['ret']
# {"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误
......@@ -304,7 +258,7 @@ def job(count,key):
# browser_run.refresh()
r.set(key, 50)
r.expire(key, 5400)
return count
return json_search,ret
elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -320,7 +274,7 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return count
return json_search,ret
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -336,7 +290,7 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return count
return json_search,ret
else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -351,80 +305,132 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return json_search,ret
def job(count,key):
# 刷新浏览器并获取当前token和cookie
token, cookies = flushAndGetToken(list_b)
log.info('===========获取公众号============')
start_ = time.time()
#todo:redis中数据 pop一条
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None' or infoSourceCode == None:
#当一次采集完之后,重新插入数据并等待插入完成
getFromSql()
time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return count
try:
Max_data = int(json_search['app_msg_cnt'])
Max_page = int(int(json_search['app_msg_cnt']) / 5)
if int(json_search['app_msg_cnt']) % 5 != 0:
Max_page = Max_page + 1
else:
Max_page = Max_page
except:
Max_page = 1
Max_data = 5
log.info(f'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----')
for i in range(0, Max_data, 5):
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={i}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1
try:
# ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers,
verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(2)
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
return count
list_all = json_search['app_msg_list']
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
# sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20220609-57436' "
cursor.execute(sql)
row = cursor.fetchone()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
log.info('===========获取biz==========')
s.cookies.update(cookies)
s.keep_alive = False
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
e,
'biz错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return count
json_search,ret = RequestUrl(dic_url,token,key)
if ret == 0:
try:
#开始采集每一页文章信息
page = int(i/5+1)
log.info(f'---{origin}---------开始采集第{page}个分页-----------')
list_all_info,num_caiji= get_info(sid,json_search,origin,url_,info_source_code,page)
print(f'----第{page}页采集到文章个数-----{len(list_all_info)}------{num_caiji}-------')
time.sleep(2)
if len(list_all_info) != 0:
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
success = [
origin,
url_,
info_source_code,
'采集成功',
num_caiji,
time_now,
]
#成功信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success))
cnx_.commit()
# 该公众号的所有文章采集完成
log.info(f'---第{page}页采集到文章个数---{len(list_all_info)}---{num_caiji}---耗时{baseCore.getTimeCost(start_,time.time())}')
Max_data = int(json_search['app_msg_cnt'])
Max_page = int(int(json_search['app_msg_cnt']) / 5)
if int(json_search['app_msg_cnt']) % 5 != 0:
Max_page = Max_page + 1
else:
Max_page = Max_page
except:
Max_page = 1
Max_data = 5
log.info(f'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----')
for i in range(0, Max_data, 5):
json_search,ret = RequestUrl(dic_url,token,key)
if ret == 0:
pass
else:
log.info(f'----第{page}页采集到文章个数{num_caiji}--网址已存在!-----耗时{baseCore.getTimeCost(start_,time.time())}')
return count
except Exception as e:
# json解析该公众号成功但采集数据失败
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
origin,
url_,
info_source_code,
e,
'采集失败',
time_now
]
# 失败信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
count += 1
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_, time.time())}')
if json_search != '':
# list_all = json_search['app_msg_list']
try:
#开始采集每一页文章信息
page = int(i/5+1)
log.info(f'---{origin}---------开始采集第{page}个分页-----------')
list_all_info,num_caiji= get_info(sid,json_search,origin,url_,info_source_code,page)
print(f'----第{page}页采集到文章个数-----{len(list_all_info)}------{num_caiji}-------')
time.sleep(2)
if len(list_all_info) != 0:
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
success = [
origin,
url_,
info_source_code,
'采集成功',
num_caiji,
time_now,
]
#成功信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success))
cnx_.commit()
# 该公众号的所有文章采集完成
log.info(f'---第{page}页采集到文章个数---{len(list_all_info)}---{num_caiji}---耗时{baseCore.getTimeCost(start_,time.time())}')
else:
log.info(f'----第{page}页采集到文章个数{num_caiji}--网址已存在!-----耗时{baseCore.getTimeCost(start_,time.time())}')
return count
except Exception as e:
# json解析该公众号成功但采集数据失败
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
origin,
url_,
info_source_code,
e,
'采集失败',
time_now
]
# 失败信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
log.info(f'{biz}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
count += 1
log.info(f'{biz}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_, time.time())}')
return count
else:
return count
time.sleep(2)
return count
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论