提交 70f8ebff 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

...@@ -17,14 +17,15 @@ import langid ...@@ -17,14 +17,15 @@ import langid
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
# 序列号 # 序列号
__seq = 0 __seq = 0
# 代理池 数据库连接 # 代理池 数据库连接
__cnx_proxy =None __cnx_proxy =None
__cursor_proxy = None __cursor_proxy = None
cnx = None
cursor = None
r = None
# agent 池 # agent 池
__USER_AGENT_LIST = [ __USER_AGENT_LIST = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36',
...@@ -392,7 +393,7 @@ class BaseCore: ...@@ -392,7 +393,7 @@ class BaseCore:
# 从Redis的List中获取并移除一个元素 # 从Redis的List中获取并移除一个元素
def redicPullData(self,key): def redicPullData(self,key):
item = self.r.rpop(key) item = self.r.lpop(key)
return item.decode() if item else None return item.decode() if item else None
# 获得脚本进程PID # 获得脚本进程PID
...@@ -480,7 +481,7 @@ class BaseCore: ...@@ -480,7 +481,7 @@ class BaseCore:
def writerToExcel(self,detailList,filename): def writerToExcel(self,detailList,filename):
# filename='baidu搜索.xlsx' # filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件 # 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,engine='openpyxl') existing_data = pd.read_excel(filename,engine='openpyxl',dtype=str)
# 创建新的数据 # 创建新的数据
new_data = pd.DataFrame(data=detailList) new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾 # 将新数据添加到现有数据的末尾
......
import time import time
import pymysql
from base import BaseCore from base import BaseCore
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.blocking import BlockingScheduler
basecore = BaseCore.BaseCore() basecore = BaseCore.BaseCore()
log = basecore.getLogger() log = basecore.getLogger()
cnx = basecore.cnx
cursor = basecore.cursor
r = basecore.r r = basecore.r
def cnn11():
#11数据库
cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx_.cursor()
return cnx_,cursor_
def close11(cnx_,cursor_):
cnx_.close()
cursor_.close()
# # 连接到Redis # # 连接到Redis
# r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) # r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
# #
...@@ -30,11 +40,24 @@ r = basecore.r ...@@ -30,11 +40,24 @@ r = basecore.r
# return gn_social_list,gw_social_list # return gn_social_list,gw_social_list
#企业动态 #企业动态
#创建数据库连接
def connectSql():
cnx = basecore.cnx
cursor = basecore.cursor
return cnx,cursor
#关闭数据库连接
def closeSql(cnx,cursor):
cnx.close()
cursor.close()
def NewsEnterprise(): def NewsEnterprise():
cnx,cursor = connectSql()
# #获取国内企业 # #获取国内企业
# gn_query = "select SocialCode from EnterpriseInfo where Place = '1'" gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
# cursor.execute(gn_query) cursor.execute(gn_query)
# gn_result = cursor.fetchall() gn_result = cursor.fetchall()
#获取国外企业 #获取国外企业
gw_query = "select SocialCode from EnterpriseInfo where Place = '2'" gw_query = "select SocialCode from EnterpriseInfo where Place = '2'"
cursor.execute(gw_query) cursor.execute(gw_query)
...@@ -42,28 +65,26 @@ def NewsEnterprise(): ...@@ -42,28 +65,26 @@ def NewsEnterprise():
gw_social_list = [item[0] for item in gw_result] gw_social_list = [item[0] for item in gw_result]
#todo:打印长度 #todo:打印长度
print(len(gw_social_list)) # print(len(gw_social_list))
# gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
#将数据插入到redis中 #将数据插入到redis中
# for item in gn_social_list: for item in gn_social_list:
# r.rpush('NewsEnterprise:gnqy_socialCode', item) r.rpush('NewsEnterprise:gnqy_socialCode', item)
count = 0
for item in gw_social_list: for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item) r.rpush('NewsEnterprise:gwqy_socialCode', item)
count+=1 closeSql(cnx,cursor)
print(item)
print(count)
#企业动态定时任务 #企业动态定时任务
def NewsEnterprise_task(): def NewsEnterprise_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
# 每天执行一次 # 每天执行一次
scheduler.add_job(NewsEnterprise, 'cron', hour=12,minute=0,max_instances=2) scheduler.add_job(NewsEnterprise, 'cron', hour=0,minute=0,max_instances=2)
try: try:
# redisPushData # 定时开始前执行一次 # redisPushData # 定时开始前执行一次
NewsEnterprise()
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -71,14 +92,17 @@ def NewsEnterprise_task(): ...@@ -71,14 +92,17 @@ def NewsEnterprise_task():
#企业公告 #企业公告
def NoticeEnterprise(): def NoticeEnterprise():
cnx,cursor = connectSql()
# 获取国内企业 # 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 10 " gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null "
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('NoticeEnterprise:gnqy_socialCode', item) r.rpush('NoticeEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业公告定时任务 #企业公告定时任务
def NoticeEnterprise_task(): def NoticeEnterprise_task():
# 实例化一个调度器 # 实例化一个调度器
...@@ -94,14 +118,16 @@ def NoticeEnterprise_task(): ...@@ -94,14 +118,16 @@ def NoticeEnterprise_task():
#企业年报 #企业年报
def AnnualEnterprise(): def AnnualEnterprise():
cnx,cursor = connectSql()
# 获取国内企业 # 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 10" gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item) r.rpush('AnnualEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业年报定时任务 #企业年报定时任务
def AnnualEnterprise_task(): def AnnualEnterprise_task():
...@@ -110,7 +136,8 @@ def AnnualEnterprise_task(): ...@@ -110,7 +136,8 @@ def AnnualEnterprise_task():
# 每年执行一次 # 每年执行一次
scheduler.add_job(AnnualEnterprise, 'cron', second='*/10') scheduler.add_job(AnnualEnterprise, 'cron', second='*/10')
try: try:
# redisPushData # 定时开始前执行一次 # 定时开始前执行一次
AnnualEnterprise()
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -118,23 +145,51 @@ def AnnualEnterprise_task(): ...@@ -118,23 +145,51 @@ def AnnualEnterprise_task():
#企业基本信息 #企业基本信息
def BaseInfoEnterprise(): def BaseInfoEnterprise():
cnx,cursor = connectSql()
# 获取国内企业 # 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 10 " gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('BaseInfoEnterprise:gnqy_socialCode', item) r.rpush('BaseInfoEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业基本信息定时任务 #企业基本信息定时任务
def BaseInfoEnterprise_task(): def BaseInfoEnterprise_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
# 每年执行一次 # 每年执行一次
scheduler.add_job(BaseInfoEnterprise, 'cron', second='*/10') scheduler.add_job(BaseInfoEnterprise, 'cron', month=1, day=1, hour=0, minute=0)
try: try:
# redisPushData # 定时开始前执行一次 # 定时开始前执行一次
BaseInfoEnterprise()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
#东方财富网财务数据
def FinanceFromEast():
cnx_,cursor_ = cnn11()
#从上市企业库中读取数据
sql_sel = '''select social_credit_code from sys_base_enterprise_ipo where category = '1' limit 10 '''
cursor_.execute(sql_sel)
finance = cursor_.fetchall()
finance_list = [item[0] for item in finance]
print('=======')
for item in finance_list:
r.rpush('FinanceFromEast:finance_socialCode', item)
close11(cnx_,cursor_)
def FinanceFromEase_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个季度执行一次
scheduler.add_job(FinanceFromEast, 'cron', month='1-12/3', day='1',hour=0, minute=0)
try:
# 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -142,29 +197,33 @@ def BaseInfoEnterprise_task(): ...@@ -142,29 +197,33 @@ def BaseInfoEnterprise_task():
#微信公众号 #微信公众号
def WeiXingetFromSql(): def WeiXingetFromSql():
cnx_,cursor_=cnn11()
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql) cursor_.execute(selectSql)
results = cursor.fetchall() results = cursor_.fetchall()
result_list = [item[0] for item in results] result_list = [item[0] for item in results]
#放入redis #放入redis
for item in result_list: for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item) r.rpush('WeiXinGZH:infoSourceCode', item)
close11(cnx_,cursor_)
#微信公众号定时任务 #微信公众号定时任务
def weixin_task(): def weixin_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
# 每天执行一次 # 每天执行一次
scheduler.add_job(WeiXingetFromSql, 'cron', hour=12,minute=0) scheduler.add_job(WeiXingetFromSql, 'cron', hour=0,minute=0)
try: try:
# redisPushData # 定时开始前执行一次 # 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
pass pass
# 企业年报——雪球网 # 企业年报——雪球网
def AnnualEnterpriseIPO(): def AnnualEnterpriseXueQ():
cnx,cursor = connectSql()
# 获取国内上市企业 # 获取国内上市企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null and isIPO = 1 limit 10" gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null and isIPO = 1 limit 10"
cursor.execute(gn_query) cursor.execute(gn_query)
...@@ -173,8 +232,24 @@ def AnnualEnterpriseIPO(): ...@@ -173,8 +232,24 @@ def AnnualEnterpriseIPO():
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('AnnualEnterprise:gnshqy_socialCode', item) r.rpush('AnnualEnterprise:gnshqy_socialCode', item)
closeSql(cnx,cursor)
#雪球网年报定时任务
def AnnualEnterpriseXueQ_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每年执行一次
scheduler.add_job(AnnualEnterpriseXueQ, 'cron', month=1, day=1, hour=0, minute=0)
try:
# 定时开始前执行一次
AnnualEnterpriseXueQ()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
#国外企业基本信息 #国外企业基本信息
def BaseInfoEnterpriseAbroad(): def BaseInfoEnterpriseAbroad():
cnx,cursor = connectSql()
# 获取国外企业 # 获取国外企业
gn_query = "select id from EnterpriseInfo where Place = '2' limit 10 " gn_query = "select id from EnterpriseInfo where Place = '2' limit 10 "
cursor.execute(gn_query) cursor.execute(gn_query)
...@@ -183,9 +258,24 @@ def BaseInfoEnterpriseAbroad(): ...@@ -183,9 +258,24 @@ def BaseInfoEnterpriseAbroad():
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('BaseInfoEnterprise:gwqy_socialCode', item) r.rpush('BaseInfoEnterprise:gwqy_socialCode', item)
closeSql(cnx,cursor)
#国外基本信息定时任务
def BaseInfoAbroad_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个月执行一次
scheduler.add_job(BaseInfoEnterpriseAbroad, 'cron', day=1,hour=0, minute=0)
try:
# redisPushData # 定时开始前执行一次
BaseInfoEnterpriseAbroad()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
##福布斯=====从数据库中读取信息放入redis #福布斯=====从数据库中读取信息放入redis
def FBS(): def FBS():
cnx,cursor = connectSql()
# todo:调整为获取福布斯的数据库 # todo:调整为获取福布斯的数据库
# gw_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=2" # gw_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=2"
# cursor.execute(gw_query) # cursor.execute(gw_query)
...@@ -198,17 +288,14 @@ def FBS(): ...@@ -198,17 +288,14 @@ def FBS():
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
# gw_social_list = [item[0] for item in gw_result] # gw_social_list = [item[0] for item in gw_result]
#
# for item in gw_social_list: # for item in gw_social_list:
# r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item) # r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
for item in gn_social_list: for item in gn_social_list:
if not r.exists(item): if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item) r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
closeSql(cnx,cursor)
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
...@@ -216,14 +303,13 @@ if __name__ == "__main__": ...@@ -216,14 +303,13 @@ if __name__ == "__main__":
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
# BaseInfoEnterpriseAbroad() # BaseInfoEnterpriseAbroad()
# NewsEnterprise_task() NewsEnterprise_task()
# NewsEnterprise() # NewsEnterprise()
# BaseInfoEnterprise() # BaseInfoEnterprise()
# FBS() # FBS()
# NoticeEnterprise_task() NoticeEnterprise_task()
# AnnualEnterprise_task() AnnualEnterprise_task()
NoticeEnterprise() # NoticeEnterprise()
# FinanceFromEast()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===') log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
# cnx.close()
# cursor.close()
# basecore.close()
"""
"""
修改东方财富网财务数据 存储redis的方式 修改成功
"""
import requests, json, time, re, random, pymysql, redis
from datetime import datetime,timedelta
import pandas as pd
from bs4 import BeautifulSoup
from base.BaseCore import BaseCore
baseCore = BaseCore()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
# 判断股票代码是否存在
def check_code(com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
res = r.exists('com_caiwushuju_code::'+com_code)
#如果key存在 则不是第一次采集该企业, res = 1
if res:
return False #表示不是第一次采集
else:
return True #表示是第一次采集
def check_date(com_code,info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('com_caiwushuju_date::'+com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
def add_date(com_code,date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
#遍历date_list 放入redis
for date in date_list:
res = r.sadd('com_caiwushuju_code::'+com_code,date)
# 根据信用代码、股票代码、报告时间采集三张表的数据
def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType):
dic_info = {}
# 第一次采集的股票代码做处理
for nnn in range(0, 3):
try:
ynFirst = check_code(com_code)
break
except:
time.sleep(1)
#判断该报告期是否已采过
for nnn in range(0, 3):
try:
panduan = check_date(com_code,info_date)
if panduan:
return dic_info
else:
pass
break
except:
time.sleep(1)
# 页面url,用于采集字段名称
url_name = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/Index?type=web&code={com_code}'
# print(f'url_name:{url_name}')
#todo:循环20次还是采集不到的记录
try:
start_time = time.time()
for i in range(1, 20):
# 资产负债表,返回资产负债表json数据
url_data_zcfzb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/zcfzbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates={info_date}&code={com_code}'
# 利润表,返回利润表json数据
url_data_lrb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/lrbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates={info_date}&code={com_code}'
# 现金流量表,返回现金流量表json数据
url_data_xjllb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/xjllbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates={info_date}&code={com_code}'
res_data_zcfzb = requests.get(url_data_zcfzb1)
res_data_lrb = requests.get(url_data_lrb1)
res_data_xjllb = requests.get(url_data_xjllb1)
#如果没有解析成功就继续循环
try:
data_json_zcfzb = res_data_zcfzb.json()['data'][0]
print(f'{info_date}第{i}次解析成功')
except:
continue
#只要第一个能解析成功那其他的就都可以解析成功
data_json_lrb = res_data_lrb.json()['data'][0]
data_json_xjllb = res_data_xjllb.json()['data'][0]
res_name = requests.get(url_name)
soup_name = BeautifulSoup(res_name.content, 'html.parser')
#第一个表
try:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qy'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qs'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_yh'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
# bx
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_bx'})
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}资产负债表失败')
#第二个表
try:
script_lrb = soup_name.find('script', {'id': 'lrb_qy'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_qs'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
# zcfzb_yh
script_lrb = soup_name.find('script', {'id': 'lrb_yh'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_bx'})
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}利润表失败')
#第三个表
try:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qy'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qs'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_yh'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_bx'})
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}现金流量表失败')
list_zcfzb = []
for one_info in soup_zcfzb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_zcfzb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_zcfzb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_zcfzb.append(dic_info_zcfzb)
log.info(f'----list_zcfzb:采集条数{len(list_zcfzb)}----')
list_lrb = []
for one_info in soup_lrb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_lrb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_lrb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_lrb.append(dic_info_lrb)
list_xjllb = []
for one_info in soup_xjllb.find_all('tr')[2:]:
if '补充资料' in one_info.text:
break
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_xjllb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_xjllb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_xjllb.append(dic_info_xjllb)
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": com_code[2:],
"date": info_date,
"debt": list_zcfzb,
"profit": list_lrb,
"cash": list_xjllb,
"ynFirst": ynFirst,
}
#当前报告期加入列表
info_date_list.append(info_date)
return dic_info
except:
start_time = time.time()
try:
for i in range(1, 20):
# 资产负债表,返回资产负债表json数据
url_data_zcfzb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/zcfzbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates=2023-03-31%2C2022-12-31%2C2022-09-30%2C2022-06-30%2C2022-03-31&code={com_code}'
# 利润表,返回利润表json数据
url_data_lrb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/lrbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates=2023-03-31%2C2022-12-31%2C2022-09-30%2C2022-06-30%2C2022-03-31&code={com_code}'
# 现金流量表,返回现金流量表json数据
url_data_xjllb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/xjllbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates=2023-03-31%2C2022-12-31%2C2022-09-30%2C2022-06-30%2C2022-03-31&code={com_code}'
res_data_zcfzb = requests.get(url_data_zcfzb1)
res_data_lrb = requests.get(url_data_lrb1)
res_data_xjllb = requests.get(url_data_xjllb1)
# 如果没有解析成功就继续循环
try:
data_json_zcfzb = res_data_zcfzb.json()['data'][0]
log.info(f'----{com_code}---{info_date}--第{i}次解析成功-----')
except:
continue
# 只要第一个能解析成功那其他的就都可以解析成功
data_json_lrb = res_data_lrb.json()['data'][0]
data_json_xjllb = res_data_xjllb.json()['data'][0]
res_name = requests.get(url_name)
soup_name = BeautifulSoup(res_name.content, 'html.parser')
# 第一个表
try:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qy'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qs'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_yh'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
# bx
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_bx'})
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}资产负债表失败')
# 第二个表
try:
script_lrb = soup_name.find('script', {'id': 'lrb_qy'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_qs'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
# zcfzb_yh
script_lrb = soup_name.find('script', {'id': 'lrb_yh'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_bx'})
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}利润表失败')
# 第三个表
try:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qy'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qs'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_yh'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_bx'})
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}现金流量表失败')
list_zcfzb = []
for one_info in soup_zcfzb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_zcfzb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_zcfzb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_zcfzb.append(dic_info_zcfzb)
log.info(f'----list_zcfzb:采集条数{len(list_zcfzb)}----')
list_lrb = []
for one_info in soup_lrb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_lrb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_lrb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_lrb.append(dic_info_lrb)
list_xjllb = []
for one_info in soup_xjllb.find_all('tr')[2:]:
if '补充资料' in one_info.text:
break
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_xjllb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_xjllb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_xjllb.append(dic_info_xjllb)
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": com_code[2:],
"date": info_date,
"debt": list_zcfzb,
"profit": list_lrb,
"cash": list_xjllb,
"ynFirst": ynFirst,
}
info_date_list.append(info_date)
return dic_info
except:
# delist_json = {'info_date':info_date,'com_code': com_code, 'social_code': social_code}
log.info(f'---{info_date}报告期无数据,股票代码:{com_code}----')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}--报告期无数据--{com_code}')
#如果本期无数据 就把日期记录下来
delist_all.append(info_date)
def getReportTime():
# timeNow = baseCore.getNowTime(1)[:10]
list_date = []
# 2023-04-01
# 获取当前日期和时间
current_date = datetime.now()
# 计算昨天的日期
yesterday = current_date - timedelta(days=1)
# 格式化昨天的日期
report_date = yesterday.strftime('%Y-%m-%d')
list_date.append(report_date)
year = int(current_date.strftime('%Y'))
# list_date = ['2023-03-31']
list_month = ['-12-31', '-09-30', '-06-30', '-03-31']
for year in range(2022, 2018, -1):
for month in list_month:
date = str(year) + month
list_date.append(date)
return list_date
def job(taskType):
# 将上市企业库中的全部A股代码存入list
# 需要提供股票代码、企业信用代码
while True:
#从redis中获取企业信用代码
social_code = baseCore.redicPullData('FinanceFromEast:finance_socialCode')
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
continue
sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where category = '1' and social_credit_code='{social_code}' '''
cursor.execute(sql_sel)
row = cursor.fetchone()
securities_code = row[0]
exchange = row[1]
# for code in list_code:
# social_code = rows[0]
# exchange = rows[2]
# if code==rows[1]:
# securities_code = code
# else:
# continue
if exchange == 1:
com_code = 'bj' + securities_code
if exchange == 2:
com_code = 'sh' + securities_code
if exchange == 3:
com_code = 'sz' + securities_code
# if com_code=='sz002163':
list_date = getReportTime()
delist = [] # 记录该企业所有无数据的报告期
date_list = [] # 记录该企业所有数据的报告期
start_time = time.time()
# 分别对每个报告期进行采集
for info_date in list_date:
delist_all = []
info_date_list = []
dic_info = get_info(social_code, com_code, info_date, delist_all, info_date_list,taskType)
# print(dic_info)
# 将采集后的报告期存入redis
if len(dic_info)!=0:
# 调凯歌接口存储数据
data = json.dumps(dic_info)
# print(data)
url_baocun = 'http://114.115.236.206:8088/sync/finance/df'
for nnn in range(0, 3):
try:
res_baocun = requests.post(url_baocun, data=data)
break
except:
time.sleep(1)
print(res_baocun.text)
if len(info_date_list) != 0:
for date in info_date_list:
date_list.append(date)
print(date_list)
date_list = str(date_list)
for nnn in range(0, 3):
try:
add_date(com_code,date_list)
break
except:
time.sleep(1)
end_time = time.time()
log.info(f'===={com_code}====该企业耗时{end_time-start_time}===')
cnx.close()
cursor.close()
baseCore.close()
if __name__=='__main__':
task_type = '财务数据/东方财富网'
job(task_type)
...@@ -20,7 +20,9 @@ headers={ ...@@ -20,7 +20,9 @@ headers={
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',charset='utf8mb4') cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',charset='utf8mb4')
cursor= cnx.cursor() cursor= cnx.cursor()
def getTotal(pageSize): taskType = '股票代码/东方财富网'
def getTotal(pageSize,start):
total=0 total=0
for num in range(3): for num in range(3):
try: try:
...@@ -34,10 +36,15 @@ def getTotal(pageSize): ...@@ -34,10 +36,15 @@ def getTotal(pageSize):
content = baseCore.getSubStr(content, '{', '}') content = baseCore.getSubStr(content, '{', '}')
retJson = json.loads(content) retJson = json.loads(content)
total = retJson['data']['total'] total = retJson['data']['total']
response.close()
break break
except Exception as e: except Exception as e:
log.info(f"------第{num}次出错---{e}") log.info(f"------第{num}次出错---{e}")
continue continue
exception = '链接失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog('', taskType, state, takeTime, 'http://quote.eastmoney.com/center/gridlist.html?st=ChangePercent&sortType=C&sortRule=-1#hs_a_board', exception)
return total return total
...@@ -67,21 +74,30 @@ def getPageDta(pageIndex,pageSize,totalPage): ...@@ -67,21 +74,30 @@ def getPageDta(pageIndex,pageSize,totalPage):
continue continue
else: else:
log.info(f"{gpdm}-------{name}---新增") log.info(f"{gpdm}-------{name}---新增")
insertSql= f"insert into gpdm(gpdm,name,state,create_date) values ('{gpdm}','{name}',1,now())"
cursor.execute(insertSql)
cnx.commit()
insertSql= f"insert into gpdm(gpdm,name,state,create_date) values ('{gpdm}','{name}',1,now())" response.close()
cursor.execute(insertSql)
cnx.commit()
log.info(f"【{pageIndex}/{totalPage}】-----------end,耗时{baseCore.getTimeCost(start, time.time())}") log.info(f"【{pageIndex}/{totalPage}】-----------end,耗时{baseCore.getTimeCost(start, time.time())}")
break break
except Exception as e: except Exception as e:
log.info(f"------第{num}次出错---{e}") log.info(f"------第{num}次出错---{e}")
continue continue
exception = f'第{pageIndex}页链接失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog('', taskType, state, takeTime, '', exception)
def doJob(): def doJob():
pageSize=20 pageSize=20
total=getTotal(pageSize) start_time = time.time()
total=getTotal(pageSize,start_time)
if total==0: if total==0:
exception = '股票代码总数为零'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog('', taskType, state, takeTime, 'http://quote.eastmoney.com/center/gridlist.html?st=ChangePercent&sortType=C&sortRule=-1#hs_a_board', exception)
log.info(f"股票代码总数-----------{total},请检查") log.info(f"股票代码总数-----------{total},请检查")
return return
log.info(f"股票代码总数-----------{total}") log.info(f"股票代码总数-----------{total}")
...@@ -91,6 +107,9 @@ def doJob(): ...@@ -91,6 +107,9 @@ def doJob():
totalPage = total // pageSize + 1 totalPage = total // pageSize + 1
for pageIndex in range(1, totalPage + 1): for pageIndex in range(1, totalPage + 1):
getPageDta(pageIndex,pageSize,totalPage) getPageDta(pageIndex,pageSize,totalPage)
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog('', taskType, state, takeTime, '', '')
# 释放资源 # 释放资源
cursor.close() cursor.close()
cnx.close() cnx.close()
......
...@@ -41,30 +41,6 @@ def add_url(sid, article_url): ...@@ -41,30 +41,6 @@ def add_url(sid, article_url):
else: else:
return False return False
def get_proxy():
cnx = pymysql.connect(host="114.115.159.144", user="root", password="zzsn9988", db="clb_project", charset="utf8mb4")
with cnx.cursor() as cursor:
sql = "select proxy from clb_proxy"
cursor.execute(sql)
proxy_lists = cursor.fetchall()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
proxy_list.append(proxy)
return proxy_list
#定时
def getFromSql(): def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql) cursor.execute(selectSql)
...@@ -93,7 +69,7 @@ def flushAndGetToken(list_b): ...@@ -93,7 +69,7 @@ def flushAndGetToken(list_b):
def rePutIntoR(item): def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item) r.rpush('WeiXinGZH:infoSourceCode', item)
def get_info(sid,json_search,origin,info_source_code): def get_info(sid,json_search,origin,url_,info_source_code,page):
list_all_info = [] list_all_info = []
num_caiji = 0 num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -103,7 +79,7 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -103,7 +79,7 @@ def get_info(sid,json_search,origin,info_source_code):
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址 server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
) )
list_all_news = json_search['app_msg_list'] list_all_news = json_search['app_msg_list']
#采集第几篇文章
for one_news in list_all_news: for one_news in list_all_news:
news_title = one_news['title'] news_title = one_news['title']
timestamp = one_news['create_time'] timestamp = one_news['create_time']
...@@ -114,10 +90,12 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -114,10 +90,12 @@ def get_info(sid,json_search,origin,info_source_code):
url_ft = check_url(sid, url_news) url_ft = check_url(sid, url_news)
if url_ft: if url_ft:
log.info(f'已采过该篇文章----文章链接-----{url_news}') log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----')
return list_all_info,num_caiji return list_all_info,num_caiji
try: try:
res_news = requests.get(url_news, timeout=20) ip = baseCore.get_proxy()
res_news = requests.get(url_news, timeout=20,proxies=ip)
time.sleep(2)
except: except:
continue continue
soup_news = BeautifulSoup(res_news.content, 'html.parser') soup_news = BeautifulSoup(res_news.content, 'html.parser')
...@@ -132,16 +110,17 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -132,16 +110,17 @@ def get_info(sid,json_search,origin,info_source_code):
try: try:
news_content = news_html.text news_content = news_html.text
except: except:
log.info(f'--------内容为空--------{url_news}--------') log.info(f'----{origin}--第{page}页--该篇文章内容为空--{url_news}-----')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [ false = [
news_title, news_title,
url_news, url_,
news_html, info_source_code,
'文章内容为空', '文章内容为空',
time_now time_now,
url_news
] ]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,error_type,create_time,news_url) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false)) cursor_.execute(insertSql, tuple(false))
cnx_.commit() cnx_.commit()
continue continue
...@@ -182,6 +161,8 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -182,6 +161,8 @@ def get_info(sid,json_search,origin,info_source_code):
section.name = 'div' section.name = 'div'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#将信息传输到kafka中
dic_info = { dic_info = {
'sid': sid, 'sid': sid,
'title': news_title, 'title': news_title,
...@@ -196,8 +177,8 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -196,8 +177,8 @@ def get_info(sid,json_search,origin,info_source_code):
'createDate': time_now 'createDate': time_now
} }
for nnn in range(0, 3): for nnn in range(0, 3):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10) kafka_time_out = kafka_result.get(timeout=10)
add_url(sid, url_news) add_url(sid, url_news)
...@@ -205,8 +186,11 @@ def get_info(sid,json_search,origin,info_source_code): ...@@ -205,8 +186,11 @@ def get_info(sid,json_search,origin,info_source_code):
except: except:
time.sleep(5) time.sleep(5)
continue continue
finally:
producer.close()
num_caiji = num_caiji + 1 num_caiji = num_caiji + 1
list_all_info.append(dic_info) list_all_info.append(dic_info)
time.sleep(5)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = { dic_info2 = {
'infoSourceId': sid, 'infoSourceId': sid,
...@@ -235,18 +219,18 @@ def job(count,key): ...@@ -235,18 +219,18 @@ def job(count,key):
log.info('===========获取公众号============') log.info('===========获取公众号============')
start_ = time.time() start_ = time.time()
# #todo:redis中数据 pop一条 #todo:redis中数据 pop一条
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode') infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None': if infoSourceCode == 'None' or infoSourceCode == None:
#当一次采集完之后,重新插入数据并等待插入完成 #当一次采集完之后,重新插入数据并等待插入完成
getFromSql() getFromSql()
time.sleep(20) time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}') log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return return count
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' " sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛' # '一带一路百人论坛'
# sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where site_name = '一带一路百人论坛' " # sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20220609-57436' "
cursor.execute(sql) cursor.execute(sql)
row = cursor.fetchone() row = cursor.fetchone()
...@@ -282,28 +266,28 @@ def job(count,key): ...@@ -282,28 +266,28 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
return return count
fakeid = biz + '=='
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1' url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1 #获取页数
try: try:
ip = get_proxy()[random.randint(0, 3)] # ip = baseCore.get_proxy()
json_search = s.get(url_search, headers=headers, proxies=ip, json_search = s.get(url_search, headers=headers,
verify=False).json() # , proxies=ip, verify=False verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search) str_t = json.dumps(json_search)
time.sleep(2) time.sleep(1)
except Exception as e: except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===') log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code) rePutIntoR(info_source_code)
return time.sleep(20)
#{"base_resp": {"ret": 200003, "err_msg": "invalid session"}} return count
ret = json_search['base_resp']['ret']
# {"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误 # TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号 # {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接 # {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常 # 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
ret = json_search['base_resp']['ret']
if ret == 0: if ret == 0:
pass pass
elif ret == 200013: elif ret == 200013:
...@@ -318,10 +302,9 @@ def job(count,key): ...@@ -318,10 +302,9 @@ def job(count,key):
# log.info('=======等待时间600秒=====刷新浏览器=====') # log.info('=======等待时间600秒=====刷新浏览器=====')
# browser_run = list_b[0] # browser_run = list_b[0]
# browser_run.refresh() # browser_run.refresh()
r.set(key, 50) r.set(key, 50)
r.expire(key, 3600) r.expire(key, 5400)
return return count
elif ret == 200002: elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型 # 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -336,8 +319,8 @@ def job(count,key): ...@@ -336,8 +319,8 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}') log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return return count
elif ret == 200003: elif ret == 200003:
# 无效的session # 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -353,7 +336,7 @@ def job(count,key): ...@@ -353,7 +336,7 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}') log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return return count
else: else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------') log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -368,55 +351,84 @@ def job(count,key): ...@@ -368,55 +351,84 @@ def job(count,key):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
return return count
list_all = json_search['app_msg_list']
try: try:
list_all_info,num_caiji= get_info(sid,json_search,origin,info_source_code) Max_data = int(json_search['app_msg_cnt'])
print(f'----------{len(list_all_info)}------{num_caiji}-------') Max_page = int(int(json_search['app_msg_cnt']) / 5)
time.sleep(2) if int(json_search['app_msg_cnt']) % 5 != 0:
if len(list_all_info) != 0: Max_page = Max_page + 1
count += 1 else:
Max_page = Max_page
except:
Max_page = 1
Max_data = 5
log.info(f'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----')
for i in range(0, Max_data, 5):
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={i}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1
try:
# ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers,
verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(2)
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
return count
list_all = json_search['app_msg_list']
try:
#开始采集每一页文章信息
page = int(i/5+1)
log.info(f'---{origin}---------开始采集第{page}个分页-----------')
list_all_info,num_caiji= get_info(sid,json_search,origin,url_,info_source_code,page)
print(f'----第{page}页采集到文章个数-----{len(list_all_info)}------{num_caiji}-------')
time.sleep(2)
if len(list_all_info) != 0:
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
success = [
origin,
url_,
info_source_code,
'采集成功',
num_caiji,
time_now,
]
#成功信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success))
cnx_.commit()
# 该公众号的所有文章采集完成
log.info(f'---第{page}页采集到文章个数---{len(list_all_info)}---{num_caiji}---耗时{baseCore.getTimeCost(start_,time.time())}')
else:
log.info(f'----第{page}页采集到文章个数{num_caiji}--网址已存在!-----耗时{baseCore.getTimeCost(start_,time.time())}')
return count
except Exception as e:
# json解析该公众号成功但采集数据失败
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
success = [ false = [
origin, origin,
url_, url_,
info_source_code, info_source_code,
'采集成功', e,
num_caiji, '采集失败',
time_now time_now
] ]
#成功信息保存 # 失败信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(success)) cursor_.execute(insertSql, tuple(false))
cnx_.commit() cnx_.commit()
# 该公众号的所有文章采集完成 log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_,time.time())}') count += 1
else: log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_, time.time())}')
log.info(f'{fakeid}、公众号{origin}、网址已存在!耗时{baseCore.getTimeCost(start_,time.time())}')
except Exception as e:
# json解析该公众号成功但采集数据失败
count += 1
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
origin,
url_,
info_source_code,
e,
'采集失败',
time_now
]
# 失败信息保存
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
time.sleep(2) time.sleep(2)
return count return count
if __name__=="__main__": if __name__=="__main__":
requests.DEFAULT_RETRIES = 5 requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
...@@ -456,15 +468,19 @@ if __name__=="__main__": ...@@ -456,15 +468,19 @@ if __name__=="__main__":
# 设置计数器的初始值为0 # 设置计数器的初始值为0
key = baseCore.getNextSeq() key = baseCore.getNextSeq()
r.set(key, 0) r.set(key, 0)
# 设置key的过期时间为10秒 # 设置key的过期时间为一个半小时
r.expire(key, 3600) r.expire(key, 5400)
while True: while True:
time.sleep(2) time.sleep(2)
new_value = baseCore.incrSet(key) new_value = baseCore.incrSet(key)
baseCore.getttl(key) baseCore.getttl(key)
if new_value < 50: if new_value < 50:
aa = job(count,key) try:
count = aa aa = job(count,key)
count = aa
time.sleep(20)
except:
time.sleep(10)
else: else:
#刷新浏览器 #刷新浏览器
browser_run = list_b[0] browser_run = list_b[0]
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论