提交 d66eb70e 作者: 薛凌堃

微信公众号

上级 2744901e
...@@ -241,7 +241,6 @@ class BaseCore: ...@@ -241,7 +241,6 @@ class BaseCore:
except : except :
pass pass
# 计算耗时 # 计算耗时
def getTimeCost(self,start, end): def getTimeCost(self,start, end):
seconds = int(end - start) seconds = int(end - start)
...@@ -393,7 +392,7 @@ class BaseCore: ...@@ -393,7 +392,7 @@ class BaseCore:
# 从Redis的List中获取并移除一个元素 # 从Redis的List中获取并移除一个元素
def redicPullData(self,key): def redicPullData(self,key):
item = self.r.lpop(key) item = self.r.rpop(key)
return item.decode() if item else None return item.decode() if item else None
# 获得脚本进程PID # 获得脚本进程PID
...@@ -494,4 +493,27 @@ class BaseCore: ...@@ -494,4 +493,27 @@ class BaseCore:
def rePutIntoR(self,item): def rePutIntoR(self,item):
self.r.rpush('NewsEnterprise:gwqy_socialCode', item) self.r.rpush('NewsEnterprise:gwqy_socialCode', item)
#增加计数器的值并返回增加后的值
def incrSet(self,key):
# 增加计数器的值并返回增加后的值
new_value = self.r.incr(key)
print("增加后的值:", new_value)
return new_value
#获取key剩余的过期时间
def getttl(self,key):
# 获取key的剩余过期时间
ttl = self.r.ttl(key)
print("剩余过期时间:", ttl)
# 判断key是否已过期
if ttl < 0:
# key已过期,将key的值重置为0
self.r.set(key, 0)
self.r.expire(key, 3600)
time.sleep(2)
...@@ -118,7 +118,7 @@ def AnnualEnterprise_task(): ...@@ -118,7 +118,7 @@ def AnnualEnterprise_task():
#企业基本信息 #企业基本信息
def BaseInfoEnterprise(): def BaseInfoEnterprise():
# 获取国内企业 # 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 1 " gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 10 "
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
...@@ -165,32 +165,35 @@ def weixin_task(): ...@@ -165,32 +165,35 @@ def weixin_task():
##福布斯=====从数据库中读取信息放入redis ##福布斯=====从数据库中读取信息放入redis
def FBS(): def FBS():
# todo:调整为获取福布斯的数据库 # todo:调整为获取福布斯的数据库
gw_query = "select SocialCode from EnterpriseInfo where Place = '2'" # gw_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=2"
cursor.execute(gw_query) # cursor.execute(gw_query)
gw_result = cursor.fetchall() # gw_result = cursor.fetchall()
# #获取国内企业 #获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1'" gn_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=1"
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
gw_social_list = [item[0] for item in gw_result] # gw_social_list = [item[0] for item in gw_result]
#
for item in gw_social_list: # for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item) # r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
for item in gn_social_list: for item in gn_social_list:
r.rpush('NewsEnterprise:gnqy_socialCode', item) if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
# NewsEnterprise_task() # NewsEnterprise_task()
# NewsEnterprise() # NewsEnterprise()
FBS() # BaseInfoEnterprise()
# FBS()
# NoticeEnterprise_task() # NoticeEnterprise_task()
# AnnualEnterprise_task() # AnnualEnterprise_task()
NoticeEnterprise()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===') log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
# cnx.close() # cnx.close()
# cursor.close() # cursor.close()
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
''' '''
记录一天能采多少公众号,建一个数据库表 更新公众号的状态 记录一天能采多少公众号,建一个数据库表 更新公众号的状态 十分钟能采50多个
''' '''
import requests, time, random, json, pymysql, redis import requests, time, random, json, pymysql, redis
...@@ -25,16 +25,14 @@ cursor = cnx.cursor() ...@@ -25,16 +25,14 @@ cursor = cnx.cursor()
r = baseCore.r r = baseCore.r
urllib3.disable_warnings() urllib3.disable_warnings()
def check_url(sid, article_url): def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn') r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}',article_url) res = r.sismember(f'wx_url_{sid}',article_url)
if res == 1: if res == True:
return True return True
else: else:
return False return False
def add_url(sid, article_url): def add_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn') r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式 res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式
...@@ -43,7 +41,6 @@ def add_url(sid, article_url): ...@@ -43,7 +41,6 @@ def add_url(sid, article_url):
else: else:
return False return False
def get_proxy(): def get_proxy():
cnx = pymysql.connect(host="114.115.159.144", user="root", password="zzsn9988", db="clb_project", charset="utf8mb4") cnx = pymysql.connect(host="114.115.159.144", user="root", password="zzsn9988", db="clb_project", charset="utf8mb4")
with cnx.cursor() as cursor: with cnx.cursor() as cursor:
...@@ -67,8 +64,37 @@ def get_proxy(): ...@@ -67,8 +64,37 @@ def get_proxy():
proxy_list.append(proxy) proxy_list.append(proxy)
return proxy_list return proxy_list
#定时
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
result_list = [item[0] for item in results]
def get_info(sid,json_search): #放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
#刷新浏览器并获得token
def flushAndGetToken(list_b):
browser_run = list_b[0]
log.info('======刷新浏览器=====')
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
def get_info(sid,json_search,origin,info_source_code):
list_all_info = []
num_caiji = 0 num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient( obsClient = ObsClient(
...@@ -88,6 +114,7 @@ def get_info(sid,json_search): ...@@ -88,6 +114,7 @@ def get_info(sid,json_search):
url_ft = check_url(sid, url_news) url_ft = check_url(sid, url_news)
if url_ft: if url_ft:
log.info(f'已采过该篇文章----文章链接-----{url_news}')
return list_all_info,num_caiji return list_all_info,num_caiji
try: try:
res_news = requests.get(url_news, timeout=20) res_news = requests.get(url_news, timeout=20)
...@@ -173,14 +200,13 @@ def get_info(sid,json_search): ...@@ -173,14 +200,13 @@ def get_info(sid,json_search):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10) kafka_time_out = kafka_result.get(timeout=10)
# add_url(sid, url_news) add_url(sid, url_news)
break break
except: except:
time.sleep(5) time.sleep(5)
continue continue
num_caiji = num_caiji + 1 num_caiji = num_caiji + 1
list_all_info.append(dic_info) list_all_info.append(dic_info)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = { dic_info2 = {
'infoSourceId': sid, 'infoSourceId': sid,
...@@ -202,39 +228,199 @@ def get_info(sid,json_search): ...@@ -202,39 +228,199 @@ def get_info(sid,json_search):
continue continue
return list_all_info,num_caiji return list_all_info,num_caiji
#定时 def job(count,key):
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" # 刷新浏览器并获取当前token和cookie
cursor.execute(selectSql) token, cookies = flushAndGetToken(list_b)
results = cursor.fetchall()
result_list = [item[0] for item in results] log.info('===========获取公众号============')
start_ = time.time()
# #todo:redis中数据 pop一条
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None':
#当一次采集完之后,重新插入数据并等待插入完成
getFromSql()
time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
# sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where site_name = '一带一路百人论坛' "
cursor.execute(sql)
row = cursor.fetchone()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
#放入redis log.info('===========获取biz==========')
for item in result_list: s.cookies.update(cookies)
r.rpush('WeiXinGZH:infoSourceCode', item) s.keep_alive = False
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
e,
'biz错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1
try:
ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers, proxies=ip,
verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(2)
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
return
#{"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
ret = json_search['base_resp']['ret']
if ret == 0:
pass
elif ret == 200013:
# 重新放入redis
# time.sleep(3600)
# 刷新 暂时用一下方法
rePutIntoR(info_source_code)
log.info(f'======该账号被封=======')
# for i in range(0,6): #600,1200,1800,2400,3000,3600
# #刷新
# time.sleep(600)
# log.info('=======等待时间600秒=====刷新浏览器=====')
# browser_run = list_b[0]
# browser_run.refresh()
r.set(key, 50)
r.expire(key, 3600)
return
elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效biz参数',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}')
return
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效session',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return
else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'其他错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return
list_all = json_search['app_msg_list']
try:
list_all_info,num_caiji= get_info(sid,json_search,origin,info_source_code)
print(f'----------{len(list_all_info)}------{num_caiji}-------')
time.sleep(2)
if len(list_all_info) != 0:
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
success = [
origin,
url_,
info_source_code,
'采集成功',
num_caiji,
time_now
]
#成功信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success))
cnx_.commit()
# 该公众号的所有文章采集完成
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_,time.time())}')
else:
log.info(f'{fakeid}、公众号{origin}、网址已存在!耗时{baseCore.getTimeCost(start_,time.time())}')
except Exception as e:
# json解析该公众号成功但采集数据失败
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
origin,
url_,
info_source_code,
e,
'采集失败',
time_now
]
# 失败信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
time.sleep(2)
return count
#刷新浏览器并获得token
def flushAndGetToken(list_b):
browser_run = list_b[0]
log.info('======刷新浏览器=====')
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
if __name__=="__main__": if __name__=="__main__":
requests.DEFAULT_RETRIES = 5 requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f'开始时间为:{time_start}') log.info(f'开始时间为:{time_start}')
requests.adapters.DEFAULT_RETRIES = 3 requests.adapters.DEFAULT_RETRIES = 3
headers = { headers = {
...@@ -258,199 +444,40 @@ if __name__=="__main__": ...@@ -258,199 +444,40 @@ if __name__=="__main__":
list_b = [browser1] list_b = [browser1]
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser1.get(url) browser1.get(url)
# 可改动 # 可改动
time.sleep(30) time.sleep(30)
num_b = 0 num_b = 0
# 记录运行公众号的个数
count = 0
# todo:从数据库里读信息,放入redis,定时任务 每天放入数据 # todo:从数据库里读信息,放入redis,定时任务 每天放入数据
# getFromSql() # getFromSql()
s = requests.session() s = requests.session()
# 记录运行公众号的个数 # 设置计数器的初始值为0
count = 0 key = baseCore.getNextSeq()
r.set(key, 0)
# 设置key的过期时间为10秒
r.expire(key, 3600)
while True: while True:
# 刷新浏览器并获取当前token和cookie time.sleep(2)
token, cookies = flushAndGetToken(list_b) new_value = baseCore.incrSet(key)
list_all_info = [] baseCore.getttl(key)
log.info('===========获取公众号============') if new_value < 50:
start_ = time.time() aa = job(count,key)
# #todo:redis中数据 pop一条 count = aa
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None':
#当一次采集完之后,重新插入数据并等待插入完成
getFromSql()
time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
continue
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
# sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where site_name = '一带一路百人论坛' "
cursor.execute(sql)
row = cursor.fetchone()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
log.info('===========获取biz==========')
s.cookies.update(cookies)
s.keep_alive = False
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
e,
'biz错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
continue
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=5&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
try:
ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers, proxies=ip,
verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(2)
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
continue
#{"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
ret = json_search['base_resp']['ret']
if ret == 0:
pass
elif ret == 200013:
# 重新放入redis
# time.sleep(3600)
# 刷新 暂时用一下方法
rePutIntoR(info_source_code)
log.info(f'======该账号被封=======')
for i in range(0,6): #600,1200,1800,2400,3000,3600
#刷新
wait_time = time.sleep(600)
log.info(f'=======等待时间{wait_time}秒=====刷新浏览器=====')
browser_run = list_b[0]
browser_run.refresh()
continue
elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效biz参数',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}')
continue
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效session',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
continue
else: else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------') #刷新浏览器
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) browser_run = list_b[0]
error = [ browser_run.refresh()
origin, time.sleep(600)
url_,
info_source_code,
str_t,
'其他错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
continue
list_all = json_search['app_msg_list'] # 关闭资源
try:
list_all_info,num_caiji= get_info(sid,json_search)
time.sleep(2)
if len(list_all_info) != 0:
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
success = [
origin,
url_,
info_source_code,
'采集成功',
num_caiji,
time_now
]
#成功信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success))
cnx_.commit()
# 该公众号的所有文章采集完成
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_,time.time())}')
else:
log.info(f'{fakeid}、公众号{origin}、网址已存在!耗时{baseCore.getTimeCost(start_,time.time())}')
except Exception as e:
# json解析该公众号成功但采集数据失败
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
origin,
url_,
info_source_code,
e,
'采集失败',
time_now
]
# 失败信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
time.sleep(2)
#关闭资源
cnx.close() cnx.close()
cursor.close() cursor.close()
baseCore.close() baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论