提交 475c1e2c 作者: 薛凌堃

微信公众号

上级 db2408bf
......@@ -93,7 +93,7 @@ def flushAndGetToken(list_b):
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
def get_info(sid,json_search,origin,info_source_code):
def get_info(sid,json_search,origin,url_,info_source_code,page):
list_all_info = []
num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -103,7 +103,7 @@ def get_info(sid,json_search,origin,info_source_code):
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
list_all_news = json_search['app_msg_list']
#采集第几篇文章
for one_news in list_all_news:
news_title = one_news['title']
timestamp = one_news['create_time']
......@@ -114,10 +114,12 @@ def get_info(sid,json_search,origin,info_source_code):
url_ft = check_url(sid, url_news)
if url_ft:
log.info(f'已采过该篇文章----文章链接-----{url_news}')
log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----')
return list_all_info,num_caiji
try:
res_news = requests.get(url_news, timeout=20)
ip = get_proxy()[random.randint(0, 3)]
res_news = requests.get(url_news, timeout=20,proxies=ip)
time.sleep(2)
except:
continue
soup_news = BeautifulSoup(res_news.content, 'html.parser')
......@@ -132,16 +134,17 @@ def get_info(sid,json_search,origin,info_source_code):
try:
news_content = news_html.text
except:
log.info(f'--------内容为空--------{url_news}--------')
log.info(f'----{origin}--第{page}页--该篇文章内容为空--{url_news}-----')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
news_title,
url_news,
news_html,
url_,
info_source_code,
'文章内容为空',
time_now
time_now,
url_news
]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)"
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,error_type,create_time,news_url) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
continue
......@@ -182,6 +185,8 @@ def get_info(sid,json_search,origin,info_source_code):
section.name = 'div'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#将信息传输到kafka中
dic_info = {
'sid': sid,
'title': news_title,
......@@ -207,6 +212,7 @@ def get_info(sid,json_search,origin,info_source_code):
continue
num_caiji = num_caiji + 1
list_all_info.append(dic_info)
time.sleep(5)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = {
'infoSourceId': sid,
......@@ -242,7 +248,7 @@ def job(count,key):
getFromSql()
time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return
return count
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
......@@ -282,10 +288,35 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return
return count
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
#获取页数
try:
ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers, proxies=ip,
verify=False).json()
time.sleep(1)
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
return count
try:
Max_data = int(json_search['app_msg_cnt'])
Max_page = int(int(json_search['app_msg_cnt']) / 5)
if int(json_search['app_msg_cnt']) % 5 != 0:
Max_page = Max_page + 1
else:
Max_page = Max_page
except:
Max_page = 1
Max_data = 5
log.info(f'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----')
for i in range(0, Max_data, 5):
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={i}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1
try:
ip = get_proxy()[random.randint(0, 3)]
......@@ -297,7 +328,7 @@ def job(count,key):
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
return
return count
#{"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
......@@ -321,7 +352,7 @@ def job(count,key):
r.set(key, 50)
r.expire(key, 3600)
return
return count
elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -337,7 +368,7 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}')
return
return count
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -353,7 +384,7 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return
return count
else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -368,12 +399,15 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return
return count
list_all = json_search['app_msg_list']
try:
list_all_info,num_caiji= get_info(sid,json_search,origin,info_source_code)
print(f'----------{len(list_all_info)}------{num_caiji}-------')
#开始采集每一页文章信息
page = int(i/5+1)
log.info(f'---{origin}---------开始采集第{page}个分页-----------')
list_all_info,num_caiji= get_info(sid,json_search,origin,url_,info_source_code,page)
print(f'----第{page}页采集到文章个数-----{len(list_all_info)}------{num_caiji}-------')
time.sleep(2)
if len(list_all_info) != 0:
count += 1
......@@ -384,19 +418,20 @@ def job(count,key):
info_source_code,
'采集成功',
num_caiji,
time_now
time_now,
]
#成功信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success))
cnx_.commit()
# 该公众号的所有文章采集完成
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_,time.time())}')
log.info(f'---第{page}页采集到文章个数---{len(list_all_info)}---{num_caiji}---耗时{baseCore.getTimeCost(start_,time.time())}')
else:
log.info(f'{fakeid}、公众号{origin}、网址已存在!耗时{baseCore.getTimeCost(start_,time.time())}')
log.info(f'----第{page}页采集到文章个数{num_caiji}--网址已存在!-----耗时{baseCore.getTimeCost(start_,time.time())}')
return count
except Exception as e:
# json解析该公众号成功但采集数据失败
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
origin,
......@@ -411,12 +446,12 @@ def job(count,key):
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
count += 1
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_, time.time())}')
time.sleep(2)
return count
if __name__=="__main__":
requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -463,8 +498,12 @@ if __name__=="__main__":
new_value = baseCore.incrSet(key)
baseCore.getttl(key)
if new_value < 50:
try:
aa = job(count,key)
count = aa
time.sleep(20)
except:
time.sleep(10)
else:
#刷新浏览器
browser_run = list_b[0]
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论