提交 7bada3ac 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

...@@ -17,14 +17,15 @@ import langid ...@@ -17,14 +17,15 @@ import langid
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
# 序列号 # 序列号
__seq = 0 __seq = 0
# 代理池 数据库连接 # 代理池 数据库连接
__cnx_proxy =None __cnx_proxy =None
__cursor_proxy = None __cursor_proxy = None
cnx = None
cursor = None
r = None
# agent 池 # agent 池
__USER_AGENT_LIST = [ __USER_AGENT_LIST = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36',
...@@ -392,7 +393,7 @@ class BaseCore: ...@@ -392,7 +393,7 @@ class BaseCore:
# 从Redis的List中获取并移除一个元素 # 从Redis的List中获取并移除一个元素
def redicPullData(self,key): def redicPullData(self,key):
item = self.r.rpop(key) item = self.r.lpop(key)
return item.decode() if item else None return item.decode() if item else None
# 获得脚本进程PID # 获得脚本进程PID
...@@ -480,7 +481,7 @@ class BaseCore: ...@@ -480,7 +481,7 @@ class BaseCore:
def writerToExcel(self,detailList,filename): def writerToExcel(self,detailList,filename):
# filename='baidu搜索.xlsx' # filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件 # 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,engine='openpyxl') existing_data = pd.read_excel(filename,engine='openpyxl',dtype=str)
# 创建新的数据 # 创建新的数据
new_data = pd.DataFrame(data=detailList) new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾 # 将新数据添加到现有数据的末尾
......
import time import time
import pymysql
from base import BaseCore from base import BaseCore
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.blocking import BlockingScheduler
basecore = BaseCore.BaseCore() basecore = BaseCore.BaseCore()
log = basecore.getLogger() log = basecore.getLogger()
#144数据库
cnx = basecore.cnx cnx = basecore.cnx
cursor = basecore.cursor cursor = basecore.cursor
r = basecore.r r = basecore.r
#11数据库
cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx_.cursor()
# # 连接到Redis # # 连接到Redis
# r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) # r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
# #
...@@ -32,9 +39,9 @@ r = basecore.r ...@@ -32,9 +39,9 @@ r = basecore.r
#企业动态 #企业动态
def NewsEnterprise(): def NewsEnterprise():
# #获取国内企业 # #获取国内企业
# gn_query = "select SocialCode from EnterpriseInfo where Place = '1'" gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
# cursor.execute(gn_query) cursor.execute(gn_query)
# gn_result = cursor.fetchall() gn_result = cursor.fetchall()
#获取国外企业 #获取国外企业
gw_query = "select SocialCode from EnterpriseInfo where Place = '2'" gw_query = "select SocialCode from EnterpriseInfo where Place = '2'"
cursor.execute(gw_query) cursor.execute(gw_query)
...@@ -42,20 +49,20 @@ def NewsEnterprise(): ...@@ -42,20 +49,20 @@ def NewsEnterprise():
gw_social_list = [item[0] for item in gw_result] gw_social_list = [item[0] for item in gw_result]
#todo:打印长度 #todo:打印长度
print(len(gw_social_list)) # print(len(gw_social_list))
# gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
#将数据插入到redis中 #将数据插入到redis中
# for item in gn_social_list: # for item in gn_social_list:
# r.rpush('NewsEnterprise:gnqy_socialCode', item) # r.rpush('NewsEnterprise:gnqy_socialCode', item)
count = 0 # count = 0
for item in gw_social_list: for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item) r.rpush('NewsEnterprise:gwqy_socialCode', item)
count+=1 # count+=1
print(item) # print(item)
print(count) # print(count)
#企业动态定时任务 #企业动态定时任务
def NewsEnterprise_task(): def NewsEnterprise_task():
# 实例化一个调度器 # 实例化一个调度器
...@@ -140,6 +147,29 @@ def BaseInfoEnterprise_task(): ...@@ -140,6 +147,29 @@ def BaseInfoEnterprise_task():
print('定时采集异常', e) print('定时采集异常', e)
pass pass
#东方财富网财务数据
def FinanceFromEast():
#从上市企业库中读取数据
sql_sel = '''select social_credit_code from sys_base_enterprise_ipo where category = '1' limit 10 '''
cursor_.execute(sql_sel)
finance = cursor_.fetchall()
finance_list = [item[0] for item in finance]
print('=======')
for item in finance_list:
r.rpush('FinanceFromEast:finance_socialCode', item)
def FinanceFromEase_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个季度执行一次
scheduler.add_job(FinanceFromEast, 'cron', month='1-12/3', day='1',hour=0, minute=0)
try:
# redisPushData # 定时开始前执行一次
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
#微信公众号 #微信公众号
def WeiXingetFromSql(): def WeiXingetFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
...@@ -207,9 +237,6 @@ def FBS(): ...@@ -207,9 +237,6 @@ def FBS():
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item) r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
# NoticeEnterprise() # NoticeEnterprise()
...@@ -222,8 +249,9 @@ if __name__ == "__main__": ...@@ -222,8 +249,9 @@ if __name__ == "__main__":
# FBS() # FBS()
# NoticeEnterprise_task() # NoticeEnterprise_task()
# AnnualEnterprise_task() # AnnualEnterprise_task()
NoticeEnterprise() # NoticeEnterprise()
FinanceFromEast()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===') log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
# cnx.close() cnx_.close()
# cursor.close() cursor_.close()
# basecore.close() # basecore.close()
...@@ -41,30 +41,6 @@ def add_url(sid, article_url): ...@@ -41,30 +41,6 @@ def add_url(sid, article_url):
else: else:
return False return False
def get_proxy():
cnx = pymysql.connect(host="114.115.159.144", user="root", password="zzsn9988", db="clb_project", charset="utf8mb4")
with cnx.cursor() as cursor:
sql = "select proxy from clb_proxy"
cursor.execute(sql)
proxy_lists = cursor.fetchall()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
proxy_list.append(proxy)
return proxy_list
#定时
def getFromSql(): def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql) cursor.execute(selectSql)
...@@ -93,7 +69,7 @@ def flushAndGetToken(list_b): ...@@ -93,7 +69,7 @@ def flushAndGetToken(list_b):
def rePutIntoR(item): def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item) r.rpush('WeiXinGZH:infoSourceCode', item)
def get_info(sid,json_search,origin,info_source_code): def get_info(sid,json_search,origin,url_,info_source_code,page):
list_all_info = [] list_all_info = []
num_caiji = 0 num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -103,7 +79,7 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -103,7 +79,7 @@ def get_info(sid,json_search,origin,info_source_code):
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址 server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
) )
list_all_news = json_search['app_msg_list'] list_all_news = json_search['app_msg_list']
#采集第几篇文章
for one_news in list_all_news: for one_news in list_all_news:
news_title = one_news['title'] news_title = one_news['title']
timestamp = one_news['create_time'] timestamp = one_news['create_time']
...@@ -114,10 +90,12 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -114,10 +90,12 @@ def get_info(sid,json_search,origin,info_source_code):
url_ft = check_url(sid, url_news) url_ft = check_url(sid, url_news)
if url_ft: if url_ft:
log.info(f'已采过该篇文章----文章链接-----{url_news}') log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----')
return list_all_info,num_caiji return list_all_info,num_caiji
try: try:
res_news = requests.get(url_news, timeout=20) ip = baseCore.get_proxy()
res_news = requests.get(url_news, timeout=20,proxies=ip)
time.sleep(2)
except: except:
continue continue
soup_news = BeautifulSoup(res_news.content, 'html.parser') soup_news = BeautifulSoup(res_news.content, 'html.parser')
...@@ -132,16 +110,17 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -132,16 +110,17 @@ def get_info(sid,json_search,origin,info_source_code):
try: try:
news_content = news_html.text news_content = news_html.text
except: except:
log.info(f'--------内容为空--------{url_news}--------') log.info(f'----{origin}--第{page}页--该篇文章内容为空--{url_news}-----')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [ false = [
news_title, news_title,
url_news, url_,
news_html, info_source_code,
'文章内容为空', '文章内容为空',
time_now time_now,
url_news
] ]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,error_type,create_time,news_url) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false)) cursor_.execute(insertSql, tuple(false))
cnx_.commit() cnx_.commit()
continue continue
...@@ -182,6 +161,8 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -182,6 +161,8 @@ def get_info(sid,json_search,origin,info_source_code):
section.name = 'div' section.name = 'div'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#将信息传输到kafka中
dic_info = { dic_info = {
'sid': sid, 'sid': sid,
'title': news_title, 'title': news_title,
...@@ -196,8 +177,8 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -196,8 +177,8 @@ def get_info(sid,json_search,origin,info_source_code):
'createDate': time_now 'createDate': time_now
} }
for nnn in range(0, 3): for nnn in range(0, 3):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
try:
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10) kafka_time_out = kafka_result.get(timeout=10)
add_url(sid, url_news) add_url(sid, url_news)
...@@ -205,8 +186,11 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -205,8 +186,11 @@ def get_info(sid,json_search,origin,info_source_code):
except: except:
time.sleep(5) time.sleep(5)
continue continue
finally:
producer.close()
num_caiji = num_caiji + 1 num_caiji = num_caiji + 1
list_all_info.append(dic_info) list_all_info.append(dic_info)
time.sleep(5)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = { dic_info2 = {
'infoSourceId': sid, 'infoSourceId': sid,
...@@ -235,18 +219,18 @@ def job(count,key): ...@@ -235,18 +219,18 @@ def job(count,key):
log.info('===========获取公众号============') log.info('===========获取公众号============')
start_ = time.time() start_ = time.time()
# #todo:redis中数据 pop一条 #todo:redis中数据 pop一条
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode') infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None': if infoSourceCode == 'None' or infoSourceCode == None:
#当一次采集完之后,重新插入数据并等待插入完成 #当一次采集完之后,重新插入数据并等待插入完成
getFromSql() getFromSql()
time.sleep(20) time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}') log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return return count
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' " sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛' # '一带一路百人论坛'
# sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where site_name = '一带一路百人论坛' " # sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20220609-57436' "
cursor.execute(sql) cursor.execute(sql)
row = cursor.fetchone() row = cursor.fetchone()
...@@ -282,28 +266,28 @@ def job(count,key): ...@@ -282,28 +266,28 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
return return count
fakeid = biz + '=='
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1' url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1 #获取页数
try: try:
ip = get_proxy()[random.randint(0, 3)] # ip = baseCore.get_proxy()
json_search = s.get(url_search, headers=headers, proxies=ip, json_search = s.get(url_search, headers=headers,
verify=False).json() # , proxies=ip, verify=False verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search) str_t = json.dumps(json_search)
time.sleep(2) time.sleep(1)
except Exception as e: except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===') log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code) rePutIntoR(info_source_code)
return time.sleep(20)
#{"base_resp": {"ret": 200003, "err_msg": "invalid session"}} return count
ret = json_search['base_resp']['ret']
# {"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误 # TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号 # {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接 # {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常 # 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
ret = json_search['base_resp']['ret']
if ret == 0: if ret == 0:
pass pass
elif ret == 200013: elif ret == 200013:
...@@ -318,10 +302,9 @@ def job(count,key): ...@@ -318,10 +302,9 @@ def job(count,key):
# log.info('=======等待时间600秒=====刷新浏览器=====') # log.info('=======等待时间600秒=====刷新浏览器=====')
# browser_run = list_b[0] # browser_run = list_b[0]
# browser_run.refresh() # browser_run.refresh()
r.set(key, 50) r.set(key, 50)
r.expire(key, 3600) r.expire(key, 5400)
return return count
elif ret == 200002: elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型 # 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -336,8 +319,8 @@ def job(count,key): ...@@ -336,8 +319,8 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}') log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return return count
elif ret == 200003: elif ret == 200003:
# 无效的session # 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -353,7 +336,7 @@ def job(count,key): ...@@ -353,7 +336,7 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}') log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return return count
else: else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------') log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -368,12 +351,41 @@ def job(count,key): ...@@ -368,12 +351,41 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
return return count
try:
Max_data = int(json_search['app_msg_cnt'])
Max_page = int(int(json_search['app_msg_cnt']) / 5)
if int(json_search['app_msg_cnt']) % 5 != 0:
Max_page = Max_page + 1
else:
Max_page = Max_page
except:
Max_page = 1
Max_data = 5
log.info(f'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----')
for i in range(0, Max_data, 5):
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={i}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1
try:
# ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers,
verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(2)
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
return count
list_all = json_search['app_msg_list'] list_all = json_search['app_msg_list']
try: try:
list_all_info,num_caiji= get_info(sid,json_search,origin,info_source_code) #开始采集每一页文章信息
print(f'----------{len(list_all_info)}------{num_caiji}-------') page = int(i/5+1)
log.info(f'---{origin}---------开始采集第{page}个分页-----------')
list_all_info,num_caiji= get_info(sid,json_search,origin,url_,info_source_code,page)
print(f'----第{page}页采集到文章个数-----{len(list_all_info)}------{num_caiji}-------')
time.sleep(2) time.sleep(2)
if len(list_all_info) != 0: if len(list_all_info) != 0:
count += 1 count += 1
...@@ -384,19 +396,19 @@ def job(count,key): ...@@ -384,19 +396,19 @@ def job(count,key):
info_source_code, info_source_code,
'采集成功', '采集成功',
num_caiji, num_caiji,
time_now time_now,
] ]
#成功信息保存 #成功信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success)) cursor_.execute(insertSql, tuple(success))
cnx_.commit() cnx_.commit()
# 该公众号的所有文章采集完成 # 该公众号的所有文章采集完成
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_,time.time())}') log.info(f'---第{page}页采集到文章个数---{len(list_all_info)}---{num_caiji}---耗时{baseCore.getTimeCost(start_,time.time())}')
else: else:
log.info(f'{fakeid}、公众号{origin}、网址已存在!耗时{baseCore.getTimeCost(start_,time.time())}') log.info(f'----第{page}页采集到文章个数{num_caiji}--网址已存在!-----耗时{baseCore.getTimeCost(start_,time.time())}')
return count
except Exception as e: except Exception as e:
# json解析该公众号成功但采集数据失败 # json解析该公众号成功但采集数据失败
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [ false = [
origin, origin,
...@@ -411,12 +423,12 @@ def job(count,key): ...@@ -411,12 +423,12 @@ def job(count,key):
cursor_.execute(insertSql, tuple(false)) cursor_.execute(insertSql, tuple(false))
cnx_.commit() cnx_.commit()
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}') log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
count += 1
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_, time.time())}')
time.sleep(2) time.sleep(2)
return count return count
if __name__=="__main__": if __name__=="__main__":
requests.DEFAULT_RETRIES = 5 requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -456,15 +468,19 @@ if __name__=="__main__": ...@@ -456,15 +468,19 @@ if __name__=="__main__":
# 设置计数器的初始值为0 # 设置计数器的初始值为0
key = baseCore.getNextSeq() key = baseCore.getNextSeq()
r.set(key, 0) r.set(key, 0)
# 设置key的过期时间为10秒 # 设置key的过期时间为一个半小时
r.expire(key, 3600) r.expire(key, 5400)
while True: while True:
time.sleep(2) time.sleep(2)
new_value = baseCore.incrSet(key) new_value = baseCore.incrSet(key)
baseCore.getttl(key) baseCore.getttl(key)
if new_value < 50: if new_value < 50:
try:
aa = job(count,key) aa = job(count,key)
count = aa count = aa
time.sleep(20)
except:
time.sleep(10)
else: else:
#刷新浏览器 #刷新浏览器
browser_run = list_b[0] browser_run = list_b[0]
......
#coding=utf-8 #coding=utf-8
...@@ -266,13 +266,13 @@ class BaiduSpider(object): ...@@ -266,13 +266,13 @@ class BaiduSpider(object):
break break
for detail in lists: for detail in lists:
publishTag=detail['publishTag'] publishTag=detail['publishTag']
if publishTag: # if publishTag:
pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S") # pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S")
needDate='2022-01-01 00:00:00' # needDate='2022-01-01 00:00:00'
needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S") # needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
if pubtime < needTime: # if pubtime < needTime:
timeFlag = True # timeFlag = True
break # break
is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl) is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl)
if is_member: if is_member:
continue continue
...@@ -398,7 +398,7 @@ class BaiduSpider(object): ...@@ -398,7 +398,7 @@ class BaiduSpider(object):
processitem=self.getProcessitem(bdetail) processitem=self.getProcessitem(bdetail)
try: try:
self.sendkafka(processitem) self.sendkafka(processitem)
self.r.sadd('pybaidu_test_'+self.wordsCode, processitem['sourceAddress']) self.r.sadd('pybaidu_baidu_'+self.wordsCode, processitem['sourceAddress'])
except Exception as e: except Exception as e:
self.logger.info("放入kafka失败!") self.logger.info("放入kafka失败!")
#插入数据库 #插入数据库
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -190,29 +190,7 @@ if __name__ == '__main__': ...@@ -190,29 +190,7 @@ if __name__ == '__main__':
while True: while True:
try: try:
codeList=[] codeList=[]
codeList.append('KW-20221114-0007') codeList.append('KW-20230818-0003')
codeList.append('KW-20221114-0006')
codeList.append('KW-20221114-0005')
codeList.append('KW-20221114-0009')
codeList.append('KW-20221114-0011')
codeList.append('KW-20221114-0012')
codeList.append('KW-20221114-0013')
codeList.append('KW-20221114-0014')
codeList.append('KW-20221114-0018')
codeList.append('KW-20221213-0006')
codeList.append('KW-20221114-0008')
codeList.append('KW-20221114-0015')
codeList.append('KW-20221114-0016')
codeList.append('KW-20221114-0017')
codeList.append('KW-20221114-0019')
codeList.append('KW-20221114-0022')
codeList.append('KW-20221114-0023')
codeList.append('KW-20221114-0024')
codeList.append('KW-20221114-0025')
codeList.append('KW-20221114-0026')
codeList.append('KW-20221114-0027')
codeList.append('KW-20221114-0020')
codeList.append('KW-20221114-0021')
for codeid in codeList: for codeid in codeList:
try: try:
# keymsg=baiduTaskJob.getkafka() # keymsg=baiduTaskJob.getkafka()
......
...@@ -12,6 +12,9 @@ pip install tqdm -i https://pypi.douban.com/simple ...@@ -12,6 +12,9 @@ pip install tqdm -i https://pypi.douban.com/simple
pip install goose3 -i https://mirrors.aliyun.com/pypi/simple pip install goose3 -i https://mirrors.aliyun.com/pypi/simple
pip install Beautifulsoup4 -i https://mirrors.aliyun.com/pypi/simple pip install Beautifulsoup4 -i https://mirrors.aliyun.com/pypi/simple
pip install langid -i https://mirrors.aliyun.com/pypi/simple/ pip install langid -i https://mirrors.aliyun.com/pypi/simple/
pip install jieba -i https://mirrors.aliyun.com/pypi/simple
selenium==3.141.0 selenium==3.141.0
selenium-wire==5.1.0 selenium-wire==5.1.0
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论