提交 637feb1a 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

...@@ -556,9 +556,18 @@ class BaseCore: ...@@ -556,9 +556,18 @@ class BaseCore:
self.cursor.execute(query) self.cursor.execute(query)
token_list = self.cursor.fetchall() token_list = self.cursor.fetchall()
self.cnx.commit() self.cnx.commit()
token = token_list[random.randint(0, len(token_list) - 1)][0] try:
token = token_list[random.randint(0, len(token_list) - 1)][0]
except:
token = ''
return token return token
# 删除失效的token
def delete_token(self, token):
deletesql = f"delete from QCC_token where token='{token}' "
self.cursor.execute(deletesql)
self.cnx.commit()
# 获取天眼查token # 获取天眼查token
def GetTYCToken(self): def GetTYCToken(self):
query = 'select token from TYC_token' query = 'select token from TYC_token'
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -9,7 +9,8 @@ import json ...@@ -9,7 +9,8 @@ import json
from kafka import KafkaProducer from kafka import KafkaProducer
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
from getQccId import find_id_by_name from getQccId import find_id_by_name
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore() baseCore = BaseCore()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
...@@ -323,20 +324,28 @@ if __name__ == '__main__': ...@@ -323,20 +324,28 @@ if __name__ == '__main__':
'Accept-Encoding': 'gzip, deflate, br,' 'Accept-Encoding': 'gzip, deflate, br,'
} }
name_list = []
#从redis里拿数据 #从redis里拿数据
while True: while True:
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得 # TODO:需要隔两个小时左右抓包修改,token从数据库中获得
token = baseCore.GetToken() token = baseCore.GetToken()
list_weicha = [] if token:
list_all_info = [] pass
name_list = [] else:
log.info('==========已无token==========')
time.sleep(30)
continue
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode') # social_code = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
social_code = '91330000734530895W' social_code = '91130800757548430L'
if social_code == '': if social_code == '' or social_code is None:
time.sleep(20) time.sleep(20)
continue continue
if '搜索不到' in social_code:
continue
else:
pass
dic_info = baseCore.getInfomation(social_code) dic_info = baseCore.getInfomation(social_code)
log.info(f'----当前企业{social_code}--开始处理---') log.info(f'----当前企业{social_code}--开始处理---')
count = dic_info[14] count = dic_info[14]
...@@ -350,10 +359,16 @@ if __name__ == '__main__': ...@@ -350,10 +359,16 @@ if __name__ == '__main__':
company_id = find_id_by_name(start_time,token,social_code) company_id = find_id_by_name(start_time,token,social_code)
else: else:
company_id = find_id_by_name(start_time,token,com_name) company_id = find_id_by_name(start_time,token,com_name)
if company_id == 'null':
log.info('=====搜索不到该企业====')
#todo:搜不到的企业没有信用代码 传输不过去 生成一个信用代码
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_socialCode', social_code + ':搜索不到')
continue
if not company_id: if not company_id:
log.info(com_name + ":企业ID获取失败===重新放入redis") log.info(com_name + ":企业ID获取失败===重新放入redis")
list_weicha.append(com_name + ":企业ID获取失败")
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_socialCode',social_code) baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_socialCode',social_code)
baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
time.sleep(20) time.sleep(20)
continue continue
else: else:
...@@ -367,27 +382,32 @@ if __name__ == '__main__': ...@@ -367,27 +382,32 @@ if __name__ == '__main__':
except: except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====') log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_social_code', social_code) baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_social_code', social_code)
baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
continue continue
if post_data_list: if post_data_list:
pass pass
else: else:
log.info(f'======{social_code}====企查查token失效====')
time.sleep(20) time.sleep(20)
continue continue
for post_data in post_data_list: for post_data in post_data_list:
list_all_info.append(post_data)
if post_data is None: if post_data is None:
print(com_name + ":企业信息获取失败") log.info(com_name + ":企业信息获取失败")
list_weicha.append(com_name + ":企业信息获取失败")
continue continue
get_name = post_data['name'] get_name = post_data['name']
get_socialcode = post_data['socialCreditCode'] get_socialcode = post_data['socialCreditCode']
# todo:将信用代码或企业名称更新到表中
# updatesocialcode = f"update Global100 set SocialCode = '{get_socialcode}' where CompanyName = '{com_name}'"
# cursor_.execute(updatesocialcode)
# cnx_.commit()
name_compile = { name_compile = {
'yuan_name':com_name, 'yuan_name':com_name,
'get_name':get_name 'get_name':get_name
} }
name_list.append(name_compile) name_list.append(name_compile)
nowtime = baseCore.getNowTime(1).replace('-', '_')[:10]
baseCore.writerToExcel(name_list, f'企业名称对比.xlsx')
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time,time.time())}') log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time,time.time())}')
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2)) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
...@@ -403,12 +423,6 @@ if __name__ == '__main__': ...@@ -403,12 +423,6 @@ if __name__ == '__main__':
runType = 'BaseInfoRunCount' runType = 'BaseInfoRunCount'
count += 1 count += 1
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
nowtime = baseCore.getNowTime(1).replace('-','_')[:10]
companyName = pd.DataFrame(name_list)
companyName.to_excel(f'./data/企业名称对比_{nowtime}.xlsx',index=False)
false_com = pd.DataFrame(list_weicha)
false_com.to_excel(f'./data/采集失败企业名单_{nowtime}.xlsx',index=False)
......
# -*- coding: utf-8 -*-
import time
from urllib.parse import quote
import requests
import json
import urllib3
from kafka import KafkaProducer
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
# 通过企业名称或信用代码获取企查查id
def find_id_by_name(start,token,social_code):
urllib3.disable_warnings()
qcc_key = social_code
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = f"https://xcx.qcc.com/mp-weixin/forwardApp/v3/base/advancedSearch?token={token}&t={t}&pageIndex=1&needGroup=yes&insuredCntStart=&insuredCntEnd=&startDateBegin=&startDateEnd=&registCapiBegin=&registCapiEnd=&countyCode=&province=&sortField=&isSortAsc=&searchKey={quote(qcc_key)}&searchIndex=default&industryV3="
for lll in range(1, 6):
try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break
except:
print('重试')
time.sleep(5)
continue
time.sleep(2)
# {'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status'] == 40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status'] == 401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status'] == 40102:
KeyNo = False
log.info(f'=======无效的session=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '':
KeyNo = 'null'
else:
KeyNo = 'null'
except:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
print("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo
# 判断字符串里是否含数字
def str_have_num(str_num):
panduan = False
for str_1 in str_num:
ppp = str_1.isdigit()
if ppp:
panduan = ppp
return panduan
# 通过企查查id获取企业官网
def info_by_id(com_id,com_name):
aa_dict_list = []
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = "https://xcx.qcc.com/mp-weixin/forwardApp/v1/ent/detail?token={}&t={}&unique={}".format(token, t, com_id)
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
time.sleep(2)
try:
result_dict = resp_dict['result']['Company']
except:
print(com_name + ":获取失败")
try:
WebSite = result_dict['companyExtendInfo']['WebSite']
except:
WebSite = None
if WebSite is None:
try:
WebSite = result_dict['ContactInfo']['WebSite'][0]['Url']
except:
WebSite = ''
print(com_name + ":爬取完成")
return WebSite
if __name__ == '__main__':
taskType = '基本信息/企查查/id'
headers = {
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive',
'Qcc-Platform': 'mp-weixin',
'Qcc-Timestamp': '',
'Qcc-Version': '1.0.0',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
'content-type': 'application/json',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br,'
}
name_list = []
#从redis里拿数据
while True:
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得
token = baseCore.GetToken()
if token:
pass
else:
log.info('==========已无token==========')
time.sleep(30)
continue
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
# social_code = '91130800757548430L'
if social_code == '' or social_code is None:
time.sleep(20)
continue
if '搜索不到' in social_code:
continue
else:
pass
dic_info = baseCore.getInfomation(social_code)
log.info(f'----当前企业{social_code}--开始处理---')
count = dic_info[14]
com_name = dic_info[1]
social_code = dic_info[2]
#企查查id
company_id = dic_info[12]
#如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if company_id == '' or company_id == None:
if social_code:
company_id = find_id_by_name(start_time,token,social_code)
else:
company_id = find_id_by_name(start_time,token,com_name)
if company_id == 'null':
log.info('=====搜索不到该企业====')
#todo:搜不到的企业没有信用代码 传输不过去 生成一个信用代码
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_socialCode', social_code + ':搜索不到')
continue
if not company_id:
log.info(com_name + ":企业ID获取失败===重新放入redis")
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_socialCode',social_code)
baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
time.sleep(20)
continue
else:
log.info(f'====={social_code}===={company_id}=====获取企业id成功=====')
# todo:写入数据库
updateSql = f"update EnterpriseInfo set QCCID = '{company_id}' where SocialCode = '{social_code}'"
cursor_.execute(updateSql)
cnx_.commit()
log.info(f'----企查查id已更新-----')
# 信息采集完成后将该企业的采集次数更新
runType = 'BaseInfoRunCount'
count += 1
baseCore.updateRun(social_code, runType, count)
...@@ -46,15 +46,19 @@ def find_id_by_name(start,token,name): ...@@ -46,15 +46,19 @@ def find_id_by_name(start,token,name):
KeyNo = False KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}') log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo return KeyNo
if resp_dict['status']==40102:
KeyNo = False
log.info(f'=======无效的session=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try: try:
if resp_dict['result']['Result']: if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0] result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo'] KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip() Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '': if Name == '':
KeyNo = '' KeyNo = 'null'
else: else:
KeyNo = '' KeyNo = 'null'
except: except:
KeyNo = False KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}') log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}')
......
...@@ -158,6 +158,8 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -158,6 +158,8 @@ def beinWork(tyc_code, social_code,start_time):
contentText = smart.extract_by_url(link).text contentText = smart.extract_by_url(link).text
#不带标签正文 #不带标签正文
content = smart.extract_by_url(link).cleaned_text content = smart.extract_by_url(link).cleaned_text
if len(content) < 300:
continue
# time.sleep(3) # time.sleep(3)
except Exception as e: except Exception as e:
contentText = '' contentText = ''
......
...@@ -58,7 +58,7 @@ if __name__ == '__main__': ...@@ -58,7 +58,7 @@ if __name__ == '__main__':
'Accept-Encoding': 'gzip, deflate, br', 'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
} }
query = "SELECT * FROM clb_sys_attachment WHERE type_id=1 AND source='证监会'" query = "SELECT * FROM clb_sys_attachment WHERE type_id=1 AND source='证监会' AND id = '18703822757'"
cursor_.execute(query) cursor_.execute(query)
results = cursor_.fetchall() results = cursor_.fetchall()
for result in results: for result in results:
......
...@@ -666,7 +666,7 @@ class BaseCore: ...@@ -666,7 +666,7 @@ class BaseCore:
self.cnx_.commit() self.cnx_.commit()
# 插入到att表 返回附件id # 插入到att表 返回附件id
def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time,origin): def tableUpdate(self, retData, year, pdf_name, num, pub_time,origin):
item_id = retData['item_id'] item_id = retData['item_id']
type_id = retData['type_id'] type_id = retData['type_id']
group_name = retData['group_name'] group_name = retData['group_name']
......
...@@ -28,8 +28,8 @@ pathType = 'QYYearReport/' ...@@ -28,8 +28,8 @@ pathType = 'QYYearReport/'
type_id = 1 type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网1_福布斯2000_PDF_60_付
file_path = 'D:\\年报\\福布斯2000强_年报补充_20231018' file_path = 'D:\\年报\\欧盟记分牌2500_年报补充_718_20231018'
log.info(f'=============当前pid为{baseCore.getPID()}==============') log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news): def sendKafka(dic_news):
...@@ -146,9 +146,9 @@ if __name__=='__main__': ...@@ -146,9 +146,9 @@ if __name__=='__main__':
social_code = data[1] social_code = data[1]
ename = data[2] ename = data[2]
cname = data[3] cname = data[3]
file_name = cname + ':' + file_year + '年年度报告' + '.pdf' file_name = ename + ':' + file_year + '年年度报告' + '.pdf'
content = '' content = ''
origin = cname + '官网' origin = ename + '官网'
#解析文件页数和内容 #解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------") log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file: with open(pdf_path, 'rb') as file:
......
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
""" """
import json import json
import os import os
import re
import time import time
import uuid
from kafka import KafkaProducer from kafka import KafkaProducer
from obs import ObsClient from obs import ObsClient
...@@ -24,12 +26,15 @@ baseCore = BaseCore.BaseCore() ...@@ -24,12 +26,15 @@ baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = baseCore.cnx cnx = baseCore.cnx
cursor = baseCore.cursor cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYYearReport/' pathType = 'QYYearReport/'
type_id = 1 type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网 #付俊雪的需要改为巨潮资讯网
file_path = 'D:\\BaiduNetdiskDownload\\1_福布斯2000_PDF_50_郑' file_path = 'D:\\年报\\年度报告\\中石化炼化工程年度报告'
log.info(f'=============当前pid为{baseCore.getPID()}==============') log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news): def sendKafka(dic_news):
...@@ -66,6 +71,10 @@ def sendKafka(dic_news): ...@@ -66,6 +71,10 @@ def sendKafka(dic_news):
log.info(dic_result) log.info(dic_result)
return False return False
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path): def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
""" """
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
...@@ -92,7 +101,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path): ...@@ -92,7 +101,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by, 'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content} 'create_time': create_time, 'page_size': page_size, 'content': content}
try: try:
result = getOBSres(pathType, file_name, pdf_path) name = str(getuuid()) + '.pdf'
result = getOBSres(pathType, name, pdf_path)
except: except:
log = baseCore.getLogger() log = baseCore.getLogger()
log.error(f'OBS发送失败') log.error(f'OBS发送失败')
...@@ -117,6 +127,13 @@ def getOBSres(pathType, name, response): ...@@ -117,6 +127,13 @@ def getOBSres(pathType, name, response):
result = obsClient.putFile('zzsn', pathType+name, file_path=response) result = obsClient.putFile('zzsn', pathType+name, file_path=response)
return result return result
def secrchATT(item_id, year, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s'''
cursor_.execute(sel_sql, (item_id, year, type_id))
selects = cursor_.fetchone()
return selects
if __name__=='__main__': if __name__=='__main__':
log.info(f'-----------当前文件{file_path}---------------') log.info(f'-----------当前文件{file_path}---------------')
file_list = os.listdir(file_path) file_list = os.listdir(file_path)
...@@ -126,19 +143,27 @@ if __name__=='__main__': ...@@ -126,19 +143,27 @@ if __name__=='__main__':
start_time = time.time() start_time = time.time()
pdf_path = file_path + '/'+file pdf_path = file_path + '/'+file
file_rank = int(file.split('-')[0]) # file_name_ = file.split('-')[0].replace('公司','')
file_year = file.split('-')[1] file_year = re.findall('\d{4}', file)[0]
file_name_ = file.split(file_year)[0]
#file_rank 对应上企业信用代码 #file_rank 对应上企业信用代码
selectsql = f"select * from rankandcode where id = {file_rank}" selectsql = f"select * from 500Report where com_name = '{file_name_}'"
cursor.execute(selectsql) cursor.execute(selectsql)
data = cursor.fetchone() data = cursor.fetchone()
cnx.commit() cnx.commit()
social_code = data[1] social_code = data[2]
ename = data[2]
cname = data[3] file_name = file_name_ + ':' + file_year + '年年度报告' + '.pdf'
file_name = cname + ':' + file_year + '年年度报告' + '.pdf'
content = '' content = ''
origin = file_name_ + '官网'
selects = secrchATT(social_code,file_year,1)
if selects:
# self.getLogger().info(f'com_name:{com_name}--{year}已存在')
log.info(f'===={file_name}--年报已存在===')
continue
#解析文件页数和内容 #解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------") log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file: with open(pdf_path, 'rb') as file:
...@@ -153,7 +178,7 @@ if __name__=='__main__': ...@@ -153,7 +178,7 @@ if __name__=='__main__':
content += page.get_text() content += page.get_text()
# print(content) # print(content)
except Exception as e: except Exception as e:
log.info(f'文件已损坏:{cname}') log.info(f'文件已损坏:{file_name}')
continue continue
#解析文件大小 #解析文件大小
file_size = os.path.getsize(pdf_path) file_size = os.path.getsize(pdf_path)
...@@ -168,8 +193,9 @@ if __name__=='__main__': ...@@ -168,8 +193,9 @@ if __name__=='__main__':
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path) retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path)
if retData_f['state']: if retData_f['state']:
#retData, com_name, year, pdf_name, num, pub_time #retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,file_year,file_name, num,file_year+'-12-31') att_id= baseCore.tableUpdate(retData_f,file_year,file_name, num,file_year+'-12-31',origin)
if att_id: if att_id:
detect_language = baseCore.detect_language(content)
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
'author': '', 'author': '',
...@@ -179,8 +205,8 @@ if __name__=='__main__': ...@@ -179,8 +205,8 @@ if __name__=='__main__':
'deleteFlag': '0', 'deleteFlag': '0',
'id': '', 'id': '',
'keyWords': '', 'keyWords': '',
'lang': 'zh', 'lang': detect_language,
'origin': '企业官网', 'origin': origin,
'publishDate': file_year + '-12-31', 'publishDate': file_year + '-12-31',
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接 'sourceAddress': '', # 原文链接
...@@ -191,13 +217,15 @@ if __name__=='__main__': ...@@ -191,13 +217,15 @@ if __name__=='__main__':
'year': file_year 'year': file_year
} }
if sendKafka(dic_news): if sendKafka(dic_news):
log.info(f'成功-{file_rank}--{file_name}----{att_id}---{social_code}') log.info(f'成功---{file_name}----{att_id}---{social_code}')
num += 1 num += 1
else: else:
log.info(f'失败-{file_rank}--{file_name}----{att_id}---{social_code}') log.info(f'失败---{file_name}----{att_id}---{social_code}')
# 删除插入的数据 400表示发送数据失败 # 删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id) baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{file_name}-----{social_code}') log.info(f'已删除插入附件表的数据---{file_name}-----{social_code}')
else:
log.info(f'-----年报已存在--{social_code}--{file_name}-----')
except Exception as e: except Exception as e:
log.info(f'error------{e}') log.info(f'error------{e}')
\ No newline at end of file
""" """
...@@ -465,8 +465,9 @@ def getReportTime(): ...@@ -465,8 +465,9 @@ def getReportTime():
def job(taskType): def job(taskType):
# 将上市企业库中的全部A股代码存入list # 将上市企业库中的全部A股代码存入list
# 需要提供股票代码、企业信用代码 # 需要提供股票代码、企业信用代码
flag = 0
while True: while True:
flag = 0
# 从redis中获取企业信用代码 # 从redis中获取企业信用代码
social_code = baseCore.redicPullData('FinanceFromEast:finance_socialCode') social_code = baseCore.redicPullData('FinanceFromEast:finance_socialCode')
# social_code = '91420300178856869P' # social_code = '91420300178856869P'
...@@ -487,6 +488,7 @@ def job(taskType): ...@@ -487,6 +488,7 @@ def job(taskType):
except: except:
time.sleep(1) time.sleep(1)
print(res_.text) print(res_.text)
flag = 0
continue continue
log.info(f'==========正在采集{social_code}============') log.info(f'==========正在采集{social_code}============')
sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where category = '1' and social_credit_code='{social_code}' ''' sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where category = '1' and social_credit_code='{social_code}' '''
......
...@@ -28,6 +28,8 @@ log = baseCore.getLogger() ...@@ -28,6 +28,8 @@ log = baseCore.getLogger()
cnx = baseCore.cnx cnx = baseCore.cnx
cursor = baseCore.cursor cursor = baseCore.cursor
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=0) r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=0)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
taskType = '企业负面新闻' taskType = '企业负面新闻'
def sendKafka(dic_news): def sendKafka(dic_news):
start_time = time.time() start_time = time.time()
...@@ -93,7 +95,7 @@ def dishonesty(headers,com_name,social_code): ...@@ -93,7 +95,7 @@ def dishonesty(headers,com_name,social_code):
pass pass
else: else:
log.info(f'该企业{com_name}无严重失信信息') log.info(f'该企业{com_name}无严重失信信息')
return list_dishonesty return url,list_dishonesty
for page in range(1,total_size+1): for page in range(1,total_size+1):
param_page = { param_page = {
'tableName': 'credit_zgf_fr_sxbzxr', 'tableName': 'credit_zgf_fr_sxbzxr',
...@@ -172,7 +174,7 @@ def punish(headers,com_name,social_code): ...@@ -172,7 +174,7 @@ def punish(headers,com_name,social_code):
pass pass
else: else:
log.info(f'该企业{com_name}无行政处罚信息') log.info(f'该企业{com_name}无行政处罚信息')
return list_punish return url,list_punish
for page in range(1,total_size+1): for page in range(1,total_size+1):
param_page = { param_page = {
'tableName': 'credit_xyzx_fr_xzcf_new', 'tableName': 'credit_xyzx_fr_xzcf_new',
...@@ -250,7 +252,7 @@ def abnormal(headers,com_name,social_code): ...@@ -250,7 +252,7 @@ def abnormal(headers,com_name,social_code):
pass pass
else: else:
log.info(f'该企业{com_name}无经营异常信息') log.info(f'该企业{com_name}无经营异常信息')
return list_abhormal return url,list_abhormal
for page in range(1, total_size+1): for page in range(1, total_size+1):
param_page = { param_page = {
'tableName': 'credit_xyzx_fr_xzcf_new', 'tableName': 'credit_xyzx_fr_xzcf_new',
...@@ -293,7 +295,7 @@ def abnormal(headers,com_name,social_code): ...@@ -293,7 +295,7 @@ def abnormal(headers,com_name,social_code):
def dic_data(com_name,listData,type,detailurl): def dic_data(com_name,listData,type,detailurl):
dic_news = { dic_news = {
'title':com_name + type, 'title':com_name + type,
'structuredData':listData, 'structuredData':listData[:1],
'ynStructure':1, 'ynStructure':1,
'content': '', 'content': '',
'contentHtml': '', 'contentHtml': '',
...@@ -303,6 +305,11 @@ def dic_data(com_name,listData,type,detailurl): ...@@ -303,6 +305,11 @@ def dic_data(com_name,listData,type,detailurl):
} }
return dic_news return dic_news
def insertinto(dic_):
sql = "INSERT INTO zhejiangfmnews (title, structuredData, source) VALUES (%s, %s, %s)"
cursor.execute(sql, (dic_["title"], str(dic_["structuredData"]), dic_["source"]))
cnx.commit()
if __name__=='__main__': if __name__=='__main__':
headers = { headers = {
...@@ -315,21 +322,39 @@ if __name__=='__main__': ...@@ -315,21 +322,39 @@ if __name__=='__main__':
'sec-ch-ua-mobile': '?0', 'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"' 'sec-ch-ua-platform': '"Windows"'
} }
com_name = '石家庄交投集团工程服务有限责任公司' query = "select * from zhejiang "
social_code = '91130100MA7EK14C8L' cursor.execute(query)
url_dishonesty,list_dishonesty = dishonesty(headers,com_name,social_code) results = cursor.fetchall()
dic_dishonesty = dic_data(com_name,list_dishonesty,'严重违法失信信息',url_dishonesty) for result in results:
sendKafka(dic_dishonesty) com_name = result[1]
log.info(f'------------正在采集{com_name}----------------')
social_code = result[0]
try:
url_dishonesty,list_dishonesty = dishonesty(headers,com_name,social_code)
except:
list_dishonesty = []
log.info(f'error-------{com_name}')
continue
if list_dishonesty:
dic_dishonesty = dic_data(com_name,list_dishonesty,'严重违法失信信息',url_dishonesty)
# sendKafka(dic_dishonesty)
insertinto(dic_dishonesty)
log.info(f'----{com_name}---新增')
url_punish,list_punish = punish(headers,com_name,social_code) url_punish,list_punish = punish(headers,com_name,social_code)
dic_punish = dic_data(com_name, list_punish, '行政处罚信息', url_punish) if list_punish:
# print(dic_punish) dic_punish = dic_data(com_name, list_punish, '行政处罚信息', url_punish)
sendKafka(dic_punish) insertinto(dic_punish)
log.info(f'----{com_name}---新增')
# sendKafka(dic_punish)
url_abnormal,list_abnormal = abnormal(headers,com_name,social_code) url_abnormal,list_abnormal = abnormal(headers,com_name,social_code)
dic_abnormal = dic_data(com_name, list_abnormal, '经营异常信息', url_abnormal) if list_abnormal:
# print(dic_abnormal) dic_abnormal = dic_data(com_name, list_abnormal, '经营异常信息', url_abnormal)
sendKafka(dic_abnormal) insertinto(dic_abnormal)
log.info(f'----{com_name}---新增')
# sendKafka(dic_abnormal)
# 报告链接 # 报告链接
# url_report = f'https://public.creditchina.gov.cn/credit-check/pdf/clickDownload?companyName={com_name}&entityType=1&uuid=&tyshxydm={social_code}' # url_report = f'https://public.creditchina.gov.cn/credit-check/pdf/clickDownload?companyName={com_name}&entityType=1&uuid=&tyshxydm={social_code}'
......
title dujiaoshoubaseinfo
call activate
call conda activate zzsn@3.8.0
python baseinfo_dujiaoshou.py
pause
\ No newline at end of file
...@@ -7,16 +7,17 @@ import requests ...@@ -7,16 +7,17 @@ import requests
import json import json
from kafka import KafkaProducer from kafka import KafkaProducer
from base.BaseCore import BaseCore from BaseCore import BaseCore
from getQccId import find_id_by_name from getQccId import find_id_by_name
baseCore = BaseCore() baseCore = BaseCore()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
log = baseCore.getLogger() log = baseCore.getLogger()
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 通过企查查id获取企业基本信息 # 通过企查查id获取企业基本信息
def info_by_id(com_id,com_name,gpdm): def info_by_id(com_id,com_name):
aa_dict_list = [] aa_dict_list = []
t = str(int(time.time()) * 1000) t = str(int(time.time()) * 1000)
...@@ -31,7 +32,7 @@ def info_by_id(com_id,com_name,gpdm): ...@@ -31,7 +32,7 @@ def info_by_id(com_id,com_name,gpdm):
result_dict = resp_dict['result']['Company'] result_dict = resp_dict['result']['Company']
except: except:
log.info(com_name + ":获取失败===========重新放入redis") log.info(com_name + ":获取失败===========重新放入redis")
baseCore.rePutIntoR('EnterpriseIpo:nq_gpdm',gpdm) baseCore.rePutIntoR('china100:baseinfo',com_name)
return aa_dict_list return aa_dict_list
company_name = result_dict['Name'] company_name = result_dict['Name']
...@@ -306,12 +307,12 @@ def info_by_id(com_id,com_name,gpdm): ...@@ -306,12 +307,12 @@ def info_by_id(com_id,com_name,gpdm):
} }
aa_dict_list.append(aa_dict) aa_dict_list.append(aa_dict)
print(company_name + ":爬取完成") log.info(company_name + ":爬取完成")
return aa_dict_list return aa_dict_list
if __name__ == '__main__': if __name__ == '__main__':
taskType = '基本信息/企查查' taskType = '基本信息/企查查/中国100强'
headers = { headers = {
'Host': 'xcx.qcc.com', 'Host': 'xcx.qcc.com',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
...@@ -323,65 +324,97 @@ if __name__ == '__main__': ...@@ -323,65 +324,97 @@ if __name__ == '__main__':
'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html', 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br,' 'Accept-Encoding': 'gzip, deflate, br,'
} }
list_weicha = []
name_list = []
#从redis里拿数据 #从redis里拿数据
while True: while True:
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得 # TODO:需要隔两个小时左右抓包修改,token从数据库中获得
token = 'b4eb43143abdcf395f1335f322ca29e5' token = baseCore.GetToken()
list_weicha = [] dataList = []
list_all_info = [] if token:
name_list = [] pass
else:
log.info('==========已无token==========')
time.sleep(30)
continue
# list_all_info = []
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# com_code = baseCore.redicPullData('EnterpriseIpo:nq_gpdm') social_code = baseCore.redicPullData('china100:baseinfo')
com_code = '873349'
if '.NQ' in com_code:
com_code1 = com_code
else:
com_code1 = com_code + '.NQ'
company_id = find_id_by_name(start_time,token,com_code) # com_name = '卓新市万达铸业有限公司'
if social_code == '' or social_code is None:
if not company_id:
log.info(com_code + ":企业ID获取失败===重新放入redis")
list_weicha.append(com_code + ":企业ID获取失败")
baseCore.rePutIntoR('EnterpriseIpo:nq_gpdm',com_code)
log.info('-----已重新放入redis-----')
time.sleep(20) time.sleep(20)
continue continue
if '搜索不到' in social_code:
continue
else: else:
log.info(f'====={com_code}===={company_id}=====获取企业id成功=====') pass
# todo:企查查id写入gpdm表中 dic_info = baseCore.getInfomation(social_code)
updateSql = f"update gpdm set QCCID = '{company_id}' where gpdm = '{com_code}'" log.info(f'----当前企业{social_code}--开始处理---')
cursor_.execute(updateSql)
cnx_.commit() com_name = dic_info[1]
#企查查id
company_id = dic_info[3]
#如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if company_id == None or company_id == False:
if social_code:
company_id = find_id_by_name(start_time,token,social_code)
else:
company_id = find_id_by_name(start_time,token,com_name)
if company_id == 'null':
log.info('=====搜索不到该企业====')
#todo:搜不到的企业没有信用代码 传输不过去 生成一个信用代码
baseCore.rePutIntoR('china100:baseinfo', social_code + ':搜索不到')
continue
if not company_id:
log.info(com_name + ":企业ID获取失败===重新放入redis")
list_weicha.append(com_name + ":企业ID获取失败")
baseCore.rePutIntoR('china100:baseinfo',com_name)
baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
time.sleep(20)
continue
else:
log.info(f'====={com_name}===={company_id}=====获取企业id成功=====')
# todo:写入数据库
updateqccid = f"update China100 set qccid = '{company_id}' where CompanyName = '{com_name}'"
cursor_.execute(updateqccid)
cnx_.commit()
try: try:
post_data_list = info_by_id(company_id, '',com_code1) post_data_list = info_by_id(company_id, com_name)
except: except:
log.info(f'====={com_code}=====获取基本信息失败,重新放入redis=====') log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.rePutIntoR('EnterpriseIpo:nq_gpdm', com_code) baseCore.rePutIntoR('china100:baseinfo', com_name)
baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
continue continue
if post_data_list: if post_data_list:
pass pass
else: else:
log.info(f'======{com_code}====企查查token失效====') # log.info(f'======{social_code}====企查查token失效====')
time.sleep(20) time.sleep(20)
continue continue
for post_data in post_data_list: for post_data in post_data_list:
list_all_info.append(post_data) # list_all_info.append(post_data)
if post_data is None: if post_data is None:
print(com_code + ":企业信息获取失败") print(com_name + ":企业信息获取失败")
list_weicha.append(com_code + ":企业信息获取失败") list_weicha.append(com_name + ":企业信息获取失败")
continue continue
get_name = post_data['name'] get_name = post_data['name']
get_socialcode = post_data['socialCreditCode'] get_socialcode = post_data['socialCreditCode']
#todo:将信用代码更新到表中
updatesocialcode = f"update China100 set SocialCode = '{get_socialcode}' where CompanyName = '{com_name}'"
cursor_.execute(updatesocialcode)
cnx_.commit()
name_compile = { name_compile = {
'yuan_name':com_code, 'yuan_name':com_name,
'get_name':get_name 'get_name':get_name
} }
name_list.append(name_compile) name_list.append(name_compile)
# dataList.append(post_data)
log.info(f'采集{com_code}成功=======耗时{baseCore.getTimeCost(start_time,time.time())}') baseCore.writerToExcel(name_list,'中国100强企业.xlsx')
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time,time.time())}')
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2)) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("regionInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
...@@ -392,13 +425,9 @@ if __name__ == '__main__': ...@@ -392,13 +425,9 @@ if __name__ == '__main__':
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(get_socialcode, taskType, state, takeTime, '', exception) baseCore.recordLog(get_socialcode, taskType, state, takeTime, '', exception)
log.info(f"{get_name}--{get_socialcode}--kafka传输失败") log.info(f"{get_name}--{get_socialcode}--kafka传输失败")
# 信息采集完成后将该企业的采集次数更新
# break
nowtime = baseCore.getNowTime(1).replace('-','_')[:10]
companyName = pd.DataFrame(name_list)
companyName.to_excel(f'./data/企业名称对比_{nowtime}.xlsx',index=False)
false_com = pd.DataFrame(list_weicha)
false_com.to_excel(f'./data/采集失败企业名单_{nowtime}.xlsx',index=False)
......
...@@ -5,21 +5,43 @@ import time ...@@ -5,21 +5,43 @@ import time
from urllib.parse import quote from urllib.parse import quote
import requests import requests
import urllib3 import urllib3
from base.BaseCore import BaseCore from BaseCore import BaseCore
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
# headers = {
# 'Host': 'xcx.qcc.com',
# 'Connection': 'keep-alive',
# 'Qcc-Platform': 'mp-weixin',
# 'Qcc-Timestamp': '',
# 'Qcc-Version': '1.0.0',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
# 'content-type': 'application/json',
# 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
# 'Accept-Encoding': 'gzip, deflate, br,'
# }
headers = { headers = {
'Host': 'xcx.qcc.com', 'Host': 'xcx.qcc.com',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
'Qcc-Platform': 'mp-weixin', 'x-request-device-type': 'Android',
'Qcc-Timestamp': '', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8391',
'Content-Type': 'application/json',
'Qcc-Version': '1.0.0', 'Qcc-Version': '1.0.0',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat', 'authMini': 'Bearer f51dae1a2fcb109fa9ec58bd4a85e5c5',
'content-type': 'application/json', 'xweb_xhr': '1',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html', 'xcx-version': '2023.09.27',
'Accept-Encoding': 'gzip, deflate, br,' 'Qcc-Platform': 'mp-weixin',
} 'Qcc-CurrentPage': '/company-subpackages/business/index',
'Qcc-Timestamp': '1696661787803',
'Qcc-RefPage': '/company-subpackages/detail/index',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/307/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh'
}
# 通过企业名称或信用代码获取企查查id # 通过企业名称或信用代码获取企查查id
def find_id_by_name(start,token,name): def find_id_by_name(start,token,name):
urllib3.disable_warnings() urllib3.disable_warnings()
...@@ -32,8 +54,8 @@ def find_id_by_name(start,token,name): ...@@ -32,8 +54,8 @@ def find_id_by_name(start,token,name):
try: try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json() resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break break
except: except Exception as e:
print('重试') print(f'{e}-------------重试')
time.sleep(5) time.sleep(5)
continue continue
time.sleep(2) time.sleep(2)
...@@ -46,19 +68,23 @@ def find_id_by_name(start,token,name): ...@@ -46,19 +68,23 @@ def find_id_by_name(start,token,name):
KeyNo = False KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}') log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo return KeyNo
if resp_dict['status']==40102:
KeyNo = False
log.info(f'=======无效的session=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try: try:
if resp_dict['result']['Result']: if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0] result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo'] KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip() Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '': if Name == '':
KeyNo = '' KeyNo = 'null'
else: else:
KeyNo = '' KeyNo = 'null'
except: except:
KeyNo = False KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}') log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}')
return KeyNo return KeyNo
print("{},企业代码为:{}".format(qcc_key, KeyNo)) log.info("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo return KeyNo
\ No newline at end of file
title dujiaoshoubaseinfo
call activate
call conda activate zzsn@3.8.0
python baseinfo_dujiaoshou.py
pause
\ No newline at end of file
# -*- coding: utf-8 -*-
import time
from urllib.parse import quote
import requests
import urllib3
from BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
# headers = {
# 'Host': 'xcx.qcc.com',
# 'Connection': 'keep-alive',
# 'Qcc-Platform': 'mp-weixin',
# 'Qcc-Timestamp': '',
# 'Qcc-Version': '1.0.0',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
# 'content-type': 'application/json',
# 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
# 'Accept-Encoding': 'gzip, deflate, br,'
# }
headers = {
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive',
'x-request-device-type': 'Android',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8391',
'Content-Type': 'application/json',
'Qcc-Version': '1.0.0',
'authMini': 'Bearer f51dae1a2fcb109fa9ec58bd4a85e5c5',
'xweb_xhr': '1',
'xcx-version': '2023.09.27',
'Qcc-Platform': 'mp-weixin',
'Qcc-CurrentPage': '/company-subpackages/business/index',
'Qcc-Timestamp': '1696661787803',
'Qcc-RefPage': '/company-subpackages/detail/index',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/307/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh'
}
# 通过企业名称或信用代码获取企查查id
def find_id_by_name(start,token,name):
urllib3.disable_warnings()
qcc_key = name
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = f"https://xcx.qcc.com/mp-weixin/forwardApp/v3/base/advancedSearch?token={token}&t={t}&pageIndex=1&needGroup=yes&insuredCntStart=&insuredCntEnd=&startDateBegin=&startDateEnd=&registCapiBegin=&registCapiEnd=&countyCode=&province=&sortField=&isSortAsc=&searchKey={quote(qcc_key)}&searchIndex=default&industryV3="
for lll in range(1, 6):
try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break
except Exception as e:
print(f'{e}-------------重试')
time.sleep(5)
continue
time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==40102:
KeyNo = False
log.info(f'=======无效的session=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '':
KeyNo = 'null'
else:
KeyNo = 'null'
except:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}')
return KeyNo
log.info("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo
\ No newline at end of file
...@@ -7,16 +7,17 @@ import requests ...@@ -7,16 +7,17 @@ import requests
import json import json
from kafka import KafkaProducer from kafka import KafkaProducer
from base.BaseCore import BaseCore from BaseCore import BaseCore
from getQccId import find_id_by_name from getQccId import find_id_by_name
baseCore = BaseCore() baseCore = BaseCore()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
log = baseCore.getLogger() log = baseCore.getLogger()
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 通过企查查id获取企业基本信息 # 通过企查查id获取企业基本信息
def info_by_id(com_id,com_name,gpdm): def info_by_id(com_id,com_name):
aa_dict_list = [] aa_dict_list = []
t = str(int(time.time()) * 1000) t = str(int(time.time()) * 1000)
...@@ -31,7 +32,7 @@ def info_by_id(com_id,com_name,gpdm): ...@@ -31,7 +32,7 @@ def info_by_id(com_id,com_name,gpdm):
result_dict = resp_dict['result']['Company'] result_dict = resp_dict['result']['Company']
except: except:
log.info(com_name + ":获取失败===========重新放入redis") log.info(com_name + ":获取失败===========重新放入redis")
baseCore.rePutIntoR('EnterpriseIpo:nq_gpdm',gpdm) baseCore.rePutIntoR('global100:baseinfo',com_name)
return aa_dict_list return aa_dict_list
company_name = result_dict['Name'] company_name = result_dict['Name']
...@@ -306,12 +307,12 @@ def info_by_id(com_id,com_name,gpdm): ...@@ -306,12 +307,12 @@ def info_by_id(com_id,com_name,gpdm):
} }
aa_dict_list.append(aa_dict) aa_dict_list.append(aa_dict)
print(company_name + ":爬取完成") log.info(company_name + ":爬取完成")
return aa_dict_list return aa_dict_list
if __name__ == '__main__': if __name__ == '__main__':
taskType = '基本信息/企查查' taskType = '基本信息/企查查/中国100强'
headers = { headers = {
'Host': 'xcx.qcc.com', 'Host': 'xcx.qcc.com',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
...@@ -323,81 +324,110 @@ if __name__ == '__main__': ...@@ -323,81 +324,110 @@ if __name__ == '__main__':
'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html', 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br,' 'Accept-Encoding': 'gzip, deflate, br,'
} }
list_weicha = []
name_list = []
#从redis里拿数据 #从redis里拿数据
while True: while True:
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得 # TODO:需要隔两个小时左右抓包修改,token从数据库中获得
token = '83a9a9be4e9ecf3a8f8a20364227dc5d' token = baseCore.GetToken()
list_weicha = [] dataList = []
list_all_info = [] if token:
name_list = [] pass
else:
log.info('==========已无token==========')
time.sleep(30)
continue
# list_all_info = []
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
com_code = baseCore.redicPullData('EnterpriseIpoqccid:nq_gpdm') social_code = baseCore.redicPullData('global100:baseinfo')
if '.NQ' in com_code:
com_code1 = com_code
else:
com_code1 = com_code + '.NQ'
company_id = find_id_by_name(start_time,token,com_code1)
if not company_id: # com_name = '卓新市万达铸业有限公司'
log.info(com_code + ":企业ID获取失败===重新放入redis") if social_code == '' or social_code is None:
list_weicha.append(com_code + ":企业ID获取失败")
baseCore.rePutIntoR('EnterpriseIpoqccid:nq_gpdm',com_code)
log.info('-----已重新放入redis-----')
time.sleep(20) time.sleep(20)
continue continue
if '搜索不到' in social_code:
continue
else:
pass
dic_info = baseCore.getInfomation(social_code)
log.info(f'----当前企业{social_code}--开始处理---')
com_name = dic_info[1]
#企查查id
company_id = dic_info[3]
#如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if company_id == None or company_id == False:
if social_code:
company_id = find_id_by_name(start_time,token,social_code)
else:
company_id = find_id_by_name(start_time,token,com_name)
if company_id == 'null':
log.info('=====搜索不到该企业====')
#todo:搜不到的企业没有信用代码 传输不过去 生成一个信用代码
baseCore.rePutIntoR('global100:baseinfo', social_code + ':搜索不到')
continue
if not company_id:
log.info(com_name + ":企业ID获取失败===重新放入redis")
list_weicha.append(com_name + ":企业ID获取失败")
baseCore.rePutIntoR('global100:baseinfo',com_name)
baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
time.sleep(20)
continue
else:
log.info(f'====={com_name}===={company_id}=====获取企业id成功=====')
# todo:写入数据库
updateqccid = f"update Global100 set qccid = '{company_id}' where CompanyName = '{com_name}'"
cursor_.execute(updateqccid)
cnx_.commit()
try:
post_data_list = info_by_id(company_id, com_name)
except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.rePutIntoR('global100:baseinfo', com_name)
baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
continue
if post_data_list:
pass
else: else:
log.info(f'====={com_code}===={company_id}=====获取企业id成功=====') # log.info(f'======{social_code}====企查查token失效====')
# todo:企查查id写入gpdm表中 time.sleep(20)
updateSql = f"update gpdm set QCCID = '{company_id}' where gpdm = '{com_code}'" continue
cursor_.execute(updateSql) for post_data in post_data_list:
cnx_.commit() # list_all_info.append(post_data)
# try: if post_data is None:
# post_data_list = info_by_id(company_id, '',com_code) print(com_name + ":企业信息获取失败")
# except: list_weicha.append(com_name + ":企业信息获取失败")
# log.info(f'====={com_code}=====获取基本信息失败,重新放入redis=====') continue
# baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_social_code', com_code) get_name = post_data['name']
# continue # get_socialcode = post_data['socialCreditCode']
# if post_data_list: #todo:将信用代码更新到表中
# pass # updatesocialcode = f"update Global100 set SocialCode = '{get_socialcode}' where CompanyName = '{com_name}'"
# else: # cursor_.execute(updatesocialcode)
# log.info(f'======{com_code}====企查查token失效====') # cnx_.commit()
# time.sleep(20) name_compile = {
# continue 'yuan_name':com_name,
# for post_data in post_data_list: 'get_name':get_name
# list_all_info.append(post_data) }
# if post_data is None: name_list.append(name_compile)
# print(com_code + ":企业信息获取失败") # dataList.append(post_data)
# list_weicha.append(com_code + ":企业信息获取失败") baseCore.writerToExcel(name_list,'跨国公司100大.xlsx')
# continue log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time,time.time())}')
# get_name = post_data['name'] try:
# get_socialcode = post_data['socialCreditCode'] producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
# name_compile = { kafka_result = producer.send("regionInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
# 'yuan_name':com_code, print(kafka_result.get(timeout=10))
# 'get_name':get_name except:
# } exception = 'kafka传输失败'
# name_list.append(name_compile) state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# log.info(f'采集{com_code}成功=======耗时{baseCore.getTimeCost(start_time,time.time())}') baseCore.recordLog(get_socialcode, taskType, state, takeTime, '', exception)
# try: log.info(f"{get_name}--{get_socialcode}--kafka传输失败")
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
# kafka_result = producer.send("regionInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8')) # break
# print(kafka_result.get(timeout=10))
# except:
# exception = 'kafka传输失败'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(get_socialcode, taskType, state, takeTime, '', exception)
# log.info(f"{get_name}--{get_socialcode}--kafka传输失败")
# # 信息采集完成后将该企业的采集次数更新
# nowtime = baseCore.getNowTime(1).replace('-','_')[:10]
# companyName = pd.DataFrame(name_list)
# companyName.to_excel(f'./data/企业名称对比_{nowtime}.xlsx',index=False)
# false_com = pd.DataFrame(list_weicha)
# false_com.to_excel(f'./data/采集失败企业名单_{nowtime}.xlsx',index=False)
......
from obs import ObsClient
from base.BaseCore import BaseCore
baseCore = BaseCore()
cnx_=baseCore.cnx_
cursor_=baseCore.cursor_
# 创建ObsClient对象
obs_client = ObsClient(access_key_id='VEHN7D0TJ9316H8AHCAV', secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', server='https://obs.cn-north-1.myhuaweicloud.com')
def delete(object_keys):
# 指定要删除的文件名列表
bucket_name = 'zzsn'
# object_keys = ['QYNotice/8921b4b0-7853-11ee-bcc0-000c29312880.pdf']
# 批量删除文件
for object_key in object_keys:
resp = obs_client.deleteObject(bucket_name, object_key)
if resp.status >= 200 and resp.status < 300:
print(f"文件 {object_key} 删除成功!")
else:
print(f"文件 {object_key} 删除失败! 错误码:{resp.errorCode},错误信息:{resp.errorMessage}")
# 关闭ObsClient对象
obs_client.close()
if __name__=='__main__':
query = "SELECT object_key FROM clb_sys_attachment WHERE type_id=8 AND source = '证监会' AND create_time >= '2023-10-30 16:46:09' AND create_time <= '2023-11-01 09:11:12'"
cursor_.execute(query)
results = cursor_.fetchall()
object_keys = [item[0] for item in results]
delete(object_keys)
\ No newline at end of file
import json
import json import json
import re import re
import time import time
import uuid
from datetime import datetime
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from retry import retry
from base import BaseCore from base import BaseCore
from obs import ObsClient from obs import ObsClient
import fitz import fitz
...@@ -24,6 +30,11 @@ obsClient = ObsClient( ...@@ -24,6 +30,11 @@ obsClient = ObsClient(
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址 server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
) )
pathType = 'QYNotice/'
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
#获取文件大小 #获取文件大小
def convert_size(size_bytes): def convert_size(size_bytes):
...@@ -44,26 +55,28 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -44,26 +55,28 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): for i in range(0, 3):
try: try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20) response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length')) file_size = int(response.headers.get('Content-Length'))
break break
except: except:
time.sleep(3) time.sleep(3)
continue continue
page_size = 0 page_size = 0
for i in range(0, 3): name = str(getuuid()) + '.pdf'
try: now_time = time.strftime("%Y-%m")
name = pdf_name try:
now_time = time.strftime("%Y-%m") result = getOBSres(pathType, name, response)
result = obsClient.putContent('zzsn', 'QYNotice/'+name, content=response.content) except:
with fitz.open(stream=response.content, filetype='pdf') as doc: log.error(f'OBS发送失败')
page_size = doc.page_count return retData
for page in doc.pages(): try:
retData['content'] += page.get_text() with fitz.open(stream=response.content, filetype='pdf') as doc:
break page_size = doc.page_count
except: for page in doc.pages():
time.sleep(3) retData['content'] += page.get_text()
continue except:
log.error(f'文件损坏')
return retData
if page_size < 1: if page_size < 1:
# pdf解析失败 # pdf解析失败
...@@ -73,8 +86,8 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -73,8 +86,8 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
try: try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True retData['state'] = True
retData['path'] = unquote(result['body']['objectUrl'].split('.com')[1]) retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl']) retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = convert_size(file_size) retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now retData['create_time'] = time_now
retData['page_size'] = page_size retData['page_size'] = page_size
...@@ -86,15 +99,21 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -86,15 +99,21 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
return retData return retData
def secrchATT(item_id, name, type_id,order_by): @retry(tries=3, delay=1)
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s ''' def getOBSres(pathType,name, response):
cursor_.execute(sel_sql, (item_id, name, type_id,order_by)) result = obsClient.putContent('zzsn', pathType + name, content=response.content)
select = cursor_.fetchall() # resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
selects = select[-1] return result
def secrchATT(item_id, retData, type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id,order_by))
selects = cursor_.fetchone()
return selects return selects
# 插入到att表 返回附件id # 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num): def tableUpdate(retData, com_name, year, pdf_name, num,pub_time,origin):
item_id = retData['item_id'] item_id = retData['item_id']
type_id = retData['type_id'] type_id = retData['type_id']
group_name = retData['group_name'] group_name = retData['group_name']
...@@ -115,18 +134,19 @@ def tableUpdate(retData, com_name, year, pdf_name, num): ...@@ -115,18 +134,19 @@ def tableUpdate(retData, com_name, year, pdf_name, num):
# return id # return id
# else: # else:
try: try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = ( values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn') create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time, origin)
cursor_.execute(Upsql, values) # 插入 cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交 cnx_.commit() # 提交
except Exception as e: except Exception as e:
print(e) print(e)
log.info(f"更新完成:{item_id}===={pdf_name}") log.info(f"更新完成:{item_id}===={pdf_name}")
selects = secrchATT(item_id, pdf_name, type_id,order_by) selects = secrchATT(item_id, retData, type_id,order_by)
id = selects[0] id = selects[0]
return id return id
...@@ -300,7 +320,8 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -300,7 +320,8 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
log.info(f'====pdf解析失败====') log.info(f'====pdf解析失败====')
return False return False
num = num + 1 num = num + 1
att_id = tableUpdate(retData,com_name,year,pdf_name,num) origin = '证监会'
att_id = tableUpdate(retData,com_name,year,pdf_name,num,pub_time,origin)
if att_id: if att_id:
pass pass
else: else:
...@@ -318,7 +339,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -318,7 +339,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
'id': '', 'id': '',
'keyWords': '', 'keyWords': '',
'lang': 'zh', 'lang': 'zh',
'origin': '证监会', 'origin': origin,
'publishDate': pub_time, 'publishDate': pub_time,
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接 'sourceAddress': pdf_url, # 原文链接
...@@ -332,7 +353,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -332,7 +353,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopicaaaas", json.dumps(dic_news, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("researchReportTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
...@@ -360,6 +381,8 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -360,6 +381,8 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库中获取到的基本信息 def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库中获取到的基本信息
social_code = dic_info[2] social_code = dic_info[2]
short_name = dic_info[4] short_name = dic_info[4]
if short_name == 'None':
short_name = dic_info[1]
com_name = dic_info[1] com_name = dic_info[1]
soup = RequestUrl(url, payload, social_code, start_time) soup = RequestUrl(url, payload, social_code, start_time)
...@@ -392,7 +415,7 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库 ...@@ -392,7 +415,7 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
pass pass
else: else:
Maxpage = 50 Maxpage = 50
for i in range(1,Maxpage): for i in range(1,Maxpage+1):
log.info(f'==========正在采集第{i}页=========') log.info(f'==========正在采集第{i}页=========')
if i == 1: if i == 1:
href = url href = url
...@@ -413,17 +436,22 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库 ...@@ -413,17 +436,22 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'') pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[1].strip('\'') + '.pdf' name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[1].strip('\'') + '.pdf'
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[2].strip('\'') pub_time_ = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[2].strip('\'')
#todo:判断发布日期是否是日期格式 #todo:判断发布日期是否是日期格式
pattern = r"^\d{4}-\d{2}-\d{2}$" # 正则表达式匹配YYYY-MM-DD格式的日期 pattern = r"^\d{4}-\d{2}-\d{2}$" # 正则表达式匹配YYYY-MM-DD格式的日期
date_time_pattern = r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" date_time_pattern = r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
if re.match(pattern, pub_time): if re.match(pattern, pub_time_):
pass pass
else: else:
if re.match(date_time_pattern, pub_time): if re.match(date_time_pattern, pub_time_):
pass pass
else: else:
continue continue
# 将时间年月日字符串转换为datetime对象
date_object = datetime.strptime(pub_time_, "%Y-%m-%d")
# 将datetime对象转换为年月日时分秒字符串
pub_time = date_object.strftime("%Y-%m-%d %H:%M:%S")
year = pub_time[:4] year = pub_time[:4]
report_type = td_list[4].text.strip() report_type = td_list[4].text.strip()
...@@ -513,6 +541,8 @@ if __name__ == '__main__': ...@@ -513,6 +541,8 @@ if __name__ == '__main__':
# 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,4、8开头的为北京交易所 # 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,4、8开头的为北京交易所
code = dic_info[3] code = dic_info[3]
short_name = dic_info[4] short_name = dic_info[4]
if short_name == 'None':
short_name = dic_info[1]
com_name = dic_info[1] com_name = dic_info[1]
dic_parms = getUrl(code, url_parms, Catagory2_parms) dic_parms = getUrl(code, url_parms, Catagory2_parms)
dic_parms_ls = getUrl(code, url_parms_ls, Catagory2_parms_ls) dic_parms_ls = getUrl(code, url_parms_ls, Catagory2_parms_ls)
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -56,7 +56,7 @@ if __name__=="__main__": ...@@ -56,7 +56,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser.get(url) browser.get(url)
# 可改动 # 可改动
time.sleep(20) time.sleep(40)
s = requests.session() s = requests.session()
#获取到token和cookies #获取到token和cookies
......
...@@ -49,6 +49,7 @@ def getSourceInfo(infoSourceCode): ...@@ -49,6 +49,7 @@ def getSourceInfo(infoSourceCode):
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' " sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
cursor.execute(sql) cursor.execute(sql)
row = cursor.fetchone() row = cursor.fetchone()
cnx.commit()
dic_url = { dic_url = {
'url_': row[0], 'url_': row[0],
'sid': row[1], 'sid': row[1],
...@@ -143,6 +144,7 @@ def updateCookieToken(token,cookies): ...@@ -143,6 +144,7 @@ def updateCookieToken(token,cookies):
def getToken(): def getToken():
cursor_.execute(f"select token,cookies from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1") cursor_.execute(f"select token,cookies from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
row = cursor_.fetchall() row = cursor_.fetchall()
cnx_.commit()
if row: if row:
pass pass
else: else:
...@@ -159,7 +161,7 @@ def getPageData(dic_url,page): ...@@ -159,7 +161,7 @@ def getPageData(dic_url,page):
info_source_code = dic_url['info_source_code'] info_source_code = dic_url['info_source_code']
biz = dic_url['biz'] biz = dic_url['biz']
fakeid = biz + '==' fakeid = biz + '=='
tokenAndCookie = getToken() tokenAndCookie = getToken()
if tokenAndCookie: if tokenAndCookie:
pass pass
else: else:
...@@ -258,7 +260,7 @@ def getFromSql(): ...@@ -258,7 +260,7 @@ def getFromSql():
cnx.commit() cnx.commit()
result_list = [item[0] for item in results] result_list = [item[0] for item in results]
#放入redis # 放入redis
for item in result_list: for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item) r.rpush('WeiXinGZH:infoSourceCode', item)
if __name__=="__main__": if __name__=="__main__":
......
import json
"""
Elasticsearch 安装
pip install elasticsearch==7.8.1 版本的
使用时参考文章
https://blog.csdn.net/yangbisheng1121/article/details/128528112
https://blog.csdn.net/qiuweifan/article/details/128610083
"""
from elasticsearch import Elasticsearch
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'),timeout=300 )
self.index_name='researchreportdata'
'''
模糊
# 查询方法:模糊查询(会被分词)。
# 比如 我爱你中国,会查到只包含:“我爱你”, “中国”的内容
'''
def match(self,index_name,pnum):
body = {
'query':{
'match':{
'title' : '.pdf',
'origin' : '雪球网',
'type' : '1',
}
},
'from' : pnum,
'size' : 20,
}
filter_path=['hits.hits._source.title', # 字段1
'hits.hits._source.id'] # 字段2
result = self.es.search(index=index_name
,doc_type='_doc'
,filter_path = filter_path
,body=body)
print(result)
'''
包含查询
# 查询方法:模糊查询(不会被分词)。会查到包含:“我爱你中国”的内容
'''
def match_phrase(self,index_name):
body = {
'query':{
'match_phrase':{
'm_ext1' : 'XXXXXX' #keyword
}
}
}
filter_path=['hits.hits._source.title', # 字段1
'hits.hits._source.id',
'hits.hits._source.sourceAddress',
'hits.hits._source.publishDate'
] # 字段2
result = self.es.search(index=index_name
,doc_type='_doc'
,filter_path = filter_path
,body=body)
print(result)
'''
精准查询
'''
def term(self,index_name):
body = {
'query':{
'term':{
'm_slhm' : 'XXXXXX'
}
}
}
filter_path=['hits.hits._source.m_ext1', # 字段1
'hits.hits._source.m_ext2'] # 字段2
result = self.es.search(index=index_name
,doc_type='_doc'
,filter_path = filter_path
,body=body)
print(result)
'''
多个条件精准查询
'''
def terms(self,index_name):
body = {
'query':{
'terms':{
'm_slhm' : ['13XXXXXX ','13XXXXXX']
}
}
}
filter_path=['hits.hits._source.m_ext1', # 字段1
'hits.hits._source.m_slhm'] # 字段2
result = self.es.search(index=index_name
,doc_type='_doc'
,filter_path = filter_path
,body=body)
print(result)
'''
多条件 and 查询
'''
def multi_must(self,index_name):
body = {
'query': {
'bool': {
'must':[
{'term':{'m_slhm' : '13XXXXXXX'}},
{'terms':{'m_slhm' : ['13XXXXXX']}},
]
}
}
}
filter_path=['hits.hits._source.m_ext1', # 字段1
'hits.hits._source.m_slhm'] # 字段2
result = self.es.search(index=index_name
,doc_type='_doc'
,filter_path = filter_path
,body=body)
print(result)
'''
更新
'''
def update(self,index_name):
result = self.es.update(index=index_name
,id='20220901-XXXXX'
,body={'serialno': 'XXXXXX' })
print('更新结果:%s' % result)
'''
新增
'''
def add(self,index_name):
result = self.es.index(index=index_name
,id='20220901-XXXXXX'
,body={'serialno': 'XXXXXX' })
print('新增结果:%s' % result)
'''
删除
'''
def delete(self,index_name,id):
result = self.es.delete(index=index_name
,doc_type="_doc"
,id=id)
print('删除结果 %s' % result)
'''
多条件 or 查询
'''
def multi_should(self,index_name,pnum):
body = {
'query': {
'bool': {
'should':[
{'term':{'labels.relationId' : '91110108740053589U'}},
{'term':{'type' : 3}},
],
# 'must': [
# {'match': {'title': '.pdf'}}
# ]
}
},
'from' : pnum,
'size' : 600,
}
body = {
"query": {
"bool": {
"must": [
{
"nested": {
"path": "labels",
"query": {
"match": {
"labels.relationId": "91110108740053589U"
}
}
}
},
{
"term": {
"type.keyword": {
"value": "3"
}
}
}
]
}
},
"sort": [
{
"publishDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 212
}
filter_path=['hits.hits._source.title', # 字段1
'hits.hits._source.id',
'hits.total.value',
] # 字段2
result = self.es.search(index=index_name
,doc_type='_doc'
,filter_path = filter_path
,body=body)
print(result)
return result
'''
更新
'''
def updateaunn(self,index_name,id,utitle):
body = {
'doc': {
'title': utitle
}
}
result = self.es.update(index=index_name
,id=id
,body=body)
print('更新结果:%s' % result)
def getFileds(self,index_name):
mapping = self.es.indices.get_mapping(index=index_name)
fields = mapping[index_name]['mappings']['properties'].keys()
print(fields)
if __name__ == '__main__':
esMethod=EsMethod()
# esMethod.getFileds(index_name=esMethod.index_name)
num=1
for pnum in range(0,num):
p=pnum*20
print(f'第{pnum}页数据')
result=esMethod.multi_should(index_name=esMethod.index_name,pnum=p)
msglist=result['hits']['hits']
# print(msglist)
for mms in msglist:
id=mms['_source']['id']
title=mms['_source']['title']
utitle=title.replace('.pdf','')
print(f'id:{id}---title:{title}--utitle:{utitle}')
# esMethod.updateaunn(esMethod.index_name,str(id),utitle)
esMethod.delete(esMethod.index_name,str(id))
print('跟新成功!!')
import json
from elasticsearch import Elasticsearch
# 创建Elasticsearch对象,并提供账号信息
es = Elasticsearch(
['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988') # 账号和密码
)
index_name = 'basedata'
# 搜索文档
search_query = {
"query": {
"match": {
"sourceAddress": "www"
}
}
}
# search_query=json.dumps(search_query)
res = es.search(index=index_name, body=search_query)
for hit in res['hits']['hits']:
print(hit['_source'])
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论