提交 3ed701a1 作者: 薛凌堃

12/18

上级 1c3b0cbf
...@@ -162,34 +162,34 @@ def NoticeDF(): ...@@ -162,34 +162,34 @@ def NoticeDF():
else: else:
continue continue
# 获取港股企业 # # 获取港股企业
om_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 And SecuritiesCode like '%.HK'" # om_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 And SecuritiesCode like '%.HK'"
cursor.execute(om_gg_query) # cursor.execute(om_gg_query)
cnx.commit() # cnx.commit()
om_gg_result = cursor.fetchall() # om_gg_result = cursor.fetchall()
om_gg_social_list = [item[0] for item in om_gg_result] # om_gg_social_list = [item[0] for item in om_gg_result]
print('欧盟港股企业=======') # print('欧盟港股企业=======')
for item in om_gg_social_list: # for item in om_gg_social_list:
r.rpush('NoticeEnterprise:ggqy_socialCode_add', item) # r.rpush('NoticeEnterprise:ggqy_socialCode_add', item)
#
fq_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'" # fq_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'"
cursor.execute(fq_gg_query) # cursor.execute(fq_gg_query)
cnx.commit() # cnx.commit()
fq_gg_result = cursor.fetchall() # fq_gg_result = cursor.fetchall()
fq_gg_social_list = [item[0] for item in fq_gg_result] # fq_gg_social_list = [item[0] for item in fq_gg_result]
print('500强港股企业=======') # print('500强港股企业=======')
for item in fq_gg_social_list: # for item in fq_gg_social_list:
r.rpush('NoticeEnterprise:ggqy_socialCode_add', item) # r.rpush('NoticeEnterprise:ggqy_socialCode_add', item)
#
fbs_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'" # fbs_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'"
cursor.execute(fbs_gg_query) # cursor.execute(fbs_gg_query)
cnx.commit() # cnx.commit()
fbs_gg_result = cursor.fetchall() # fbs_gg_result = cursor.fetchall()
fbs_gg_social_list = [item[0] for item in fbs_gg_result] # fbs_gg_social_list = [item[0] for item in fbs_gg_result]
print('500强港股企业=======') # print('500强港股企业=======')
for item in fbs_gg_social_list: # for item in fbs_gg_social_list:
r.rpush('NoticeEnterprise:ggqy_socialCode_add', item) # r.rpush('NoticeEnterprise:ggqy_socialCode_add', item)
closeSql(cnx, cursor) # closeSql(cnx, cursor)
...@@ -645,13 +645,13 @@ if __name__ == "__main__": ...@@ -645,13 +645,13 @@ if __name__ == "__main__":
# shuangbaiqiye() # shuangbaiqiye()
# zhuangjingtexind() # zhuangjingtexind()
# NoticeEnterprise() # NoticeEnterprise()
# NoticeDF() NoticeDF()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
# BaseInfoEnterprise() # BaseInfoEnterprise()
# BaseInfoEnterpriseAbroad() # BaseInfoEnterpriseAbroad()
# NewsEnterprise_task() # NewsEnterprise_task()
NewsEnterprise() # NewsEnterprise()
# CorPerson() # CorPerson()
# china100() # china100()
# global100() # global100()
......
#企业动态 从redis中获取数据 # 企业动态 从redis中获取数据
import json import json
import os import os
import random import random
...@@ -11,6 +11,7 @@ import sys ...@@ -11,6 +11,7 @@ import sys
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from getTycId import getTycIdByXYDM from getTycId import getTycIdByXYDM
# from base.BaseCore import BaseCore # from base.BaseCore import BaseCore
# from base.smart import smart_extractor # from base.smart import smart_extractor
sys.path.append('D:\\kkwork\\zzsn_spider\\base') sys.path.append('D:\\kkwork\\zzsn_spider\\base')
...@@ -21,7 +22,7 @@ import urllib3 ...@@ -21,7 +22,7 @@ import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
jieba.cut("必须加载jieba") jieba.cut("必须加载jieba")
# 初始化,设置中文分词 # 初始化,设置中文分词
smart =smart_extractor.SmartExtractor('cn') smart = smart_extractor.SmartExtractor('cn')
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
...@@ -42,43 +43,43 @@ headers = { ...@@ -42,43 +43,43 @@ headers = {
'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site', 'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxODcwMzc1MjYwMCIsImlhdCI6MTY5OTkyNTk5NywiZXhwIjoxNzAyNTE3OTk3fQ.9iXmxFEiBdu2WYa7RwdU0xKKx7v_wBe9-QipH0TNKp9Dzk_2cZK1ESsmO1o8ICrddb5sx2cl5pjOBoaaf_9Qsg', 'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxODcwMzc1MjYwMCIsImlhdCI6MTcwMjcxMjg4MywiZXhwIjoxNzA1MzA0ODgzfQ.mVTR6Wz7W_IBjf4rLYhKacG9CRxGTzIGKmlqrR9jN-_t0Z4vUYVYwOTMzo7vT9IClJELruhl4d31KBHX0bZ1NQ',
'X-TYCID': '6f6298905d3011ee96146793e725899d', 'X-TYCID': '6f6298905d3011ee96146793e725899d',
'sec-ch-ua': '"Google Chrome";v="117", "Not;A=Brand";v="8", "Chromium";v="117"', 'sec-ch-ua': '"Google Chrome";v="117", "Not;A=Brand";v="8", "Chromium";v="117"',
'sec-ch-ua-mobile': '?0', 'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"', 'sec-ch-ua-platform': '"Windows"',
'version': 'TYC-Web' 'version': 'TYC-Web'
} }
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
taskType = '企业动态/天眼查/补采专精特新' taskType = '企业动态/天眼查/'
def reqDetailmsg(url,headers):
def reqDetailmsg(url, headers):
# proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'} # proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
for i in range(0,1): for i in range(0, 1):
try: try:
response=requests.get(url=url,headers=headers,timeout=8,verify=False) response = requests.get(url=url, headers=headers, timeout=8, verify=False)
response.encoding = response.apparent_encoding response.encoding = response.apparent_encoding
htmltext=response.text htmltext = response.text
except Exception as e: except Exception as e:
htmltext='' htmltext = ''
log.info(f'{url}---详情请求失败--{e}') log.info(f'{url}---详情请求失败--{e}')
if htmltext: if htmltext:
log.info(f'{url}---详情请求成功') log.info(f'{url}---详情请求成功')
break break
return htmltext return htmltext
def beinWork(tyc_code, social_code,start_time):
def beinWork(tyc_code, social_code, start_time):
time.sleep(3) time.sleep(3)
# retData={'up_state':False,'total':0,'up_okCount':0,'up_errorCount':0,'up_repetCount':0} # retData={'up_state':False,'total':0,'up_okCount':0,'up_errorCount':0,'up_repetCount':0}
retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0} retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0}
t = time.time() t = time.time()
url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100' url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100'
try: try:
for m in range(0,3): for m in range(0, 3):
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
response = requests.get(url=url, headers=headers, proxies=ip, verify=False) response = requests.get(url=url, headers=headers, proxies=ip, verify=False)
...@@ -87,18 +88,18 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -87,18 +88,18 @@ def beinWork(tyc_code, social_code,start_time):
if (response.status_code == 200): if (response.status_code == 200):
pass pass
except Exception as e: except Exception as e:
#todo:重新放入redis中 # todo:重新放入redis中
baseCore.rePutIntoR('NoticeEnterprise:gnqy_socialCode',social_code) baseCore.rePutIntoR('NewsResend:newsInfo', tyc_code)
log.error(f"{tyc_code}-----获取总数接口失败") log.error(f"{tyc_code}-----获取总数接口失败")
error = '获取总数接口失败' error = '获取总数接口失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{error}----{e}') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{error}----{e}')
#获取当前进程pid # 获取当前进程pid
current_pid = baseCore.getPID() current_pid = baseCore.getPID()
#todo: 重新启动新进程,杀死当前进程 # todo: 重新启动新进程,杀死当前进程
subprocess.Popen([sys.executable] + sys.argv) subprocess.Popen([sys.executable] + sys.argv)
os.kill(current_pid,9) os.kill(current_pid, 9)
return retData return retData
try: try:
json_1 = json.loads(response.content.decode('utf-8')) json_1 = json.loads(response.content.decode('utf-8'))
...@@ -108,7 +109,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -108,7 +109,7 @@ def beinWork(tyc_code, social_code,start_time):
e = '获取总数失败' e = '获取总数失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e) baseCore.recordLog(tyc_code, taskType, state, takeTime, url, e)
return retData return retData
if (total > 0): if (total > 0):
if (total % pageSize == 0): if (total % pageSize == 0):
...@@ -125,7 +126,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -125,7 +126,7 @@ def beinWork(tyc_code, social_code,start_time):
up_okCount = 0 up_okCount = 0
up_errorCount = 0 up_errorCount = 0
up_repetCount = 0 up_repetCount = 0
for num in range(1, totalPage + 1): for num in range(1, totalPage+1):
time.sleep(3) time.sleep(3)
log.info(f"获取分页数据--{tyc_code}----分页{num}----开始") log.info(f"获取分页数据--{tyc_code}----分页{num}----开始")
start_page = time.time() start_page = time.time()
...@@ -168,7 +169,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -168,7 +169,7 @@ def beinWork(tyc_code, social_code,start_time):
source = info_page['website'] source = info_page['website']
link = info_page['uri'] link = info_page['uri']
try: try:
sel_sql = '''select social_credit_code from brpa_source_article_news where source_address = %s and social_credit_code=%s and type='2' ''' sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor_.execute(sel_sql, (link, social_code)) cursor_.execute(sel_sql, (link, social_code))
except Exception as e: except Exception as e:
print(e) print(e)
...@@ -181,20 +182,20 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -181,20 +182,20 @@ def beinWork(tyc_code, social_code,start_time):
retData['up_okCount'] = up_okCount retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount retData['up_repetCount'] = up_repetCount
# return retData
continue continue
try: try:
time_struct = time.localtime(int(info_page['rtm'] / 1000)) # 首先把时间戳转换为结构化时间 time_struct = time.localtime(int(info_page['rtm'] / 1000)) # 首先把时间戳转换为结构化时间
time_format = time.strftime("%Y-%m-%d %H:%M:%S", time_struct) # 把结构化时间转换为格式化时间 time_format = time.strftime("%Y-%m-%d %H:%M:%S", time_struct) # 把结构化时间转换为格式化时间
except: except:
time_format = baseCore.getNowTime(1) time_format = baseCore.getNowTime(1)
try: try:
# 开始进行智能解析 # 开始进行智能解析
# lang = baseCore.detect_language(title) # lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang) # smart = smart_extractor.SmartExtractor(lang)
#带标签正文 # 带标签正文
contentText = smart.extract_by_url(link).text contentText = smart.extract_by_url(link).text
#不带标签正文 # 不带标签正文
content = smart.extract_by_url(link).cleaned_text content = smart.extract_by_url(link).cleaned_text
if len(content) < 300: if len(content) < 300:
continue continue
...@@ -216,7 +217,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -216,7 +217,7 @@ def beinWork(tyc_code, social_code,start_time):
pass pass
continue continue
try: try:
insert_sql = '''insert into brpa_source_article_news(social_credit_code,source_address,origin,type,publish_time,content,create_time) values(%s,%s,%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article (social_credit_code,source_address,origin,type,publish_time,content,create_time) values(%s,%s,%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
up_okCount = up_okCount + 1 up_okCount = up_okCount + 1
list_info = [ list_info = [
...@@ -299,24 +300,25 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -299,24 +300,25 @@ def beinWork(tyc_code, social_code,start_time):
retData['up_repetCount'] = up_repetCount retData['up_repetCount'] = up_repetCount
return retData return retData
# 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存 # 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存
def doJob(): def doJob():
while True: while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqybc_socialCode') # tycid = baseCore.redicPullData('NewsResend:newsInfo')
# social_code = '912301001275921118' tycid = '7944238'
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if social_code == None: if tycid == None:
time.sleep(20) time.sleep(20)
continue continue
start = time.time() start = time.time()
try: try:
data = baseCore.getInfomation(social_code) data = baseCore.getBYtycidInfomation(tycid)
if len(data) != 0: if len(data) != 0:
pass pass
else: else:
#数据重新塞入redis # 数据重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode',social_code) baseCore.rePutIntoR('NewsResend:newsInfo', tycid)
continue continue
id = data[0] id = data[0]
xydm = data[2] xydm = data[2]
...@@ -333,27 +335,27 @@ def doJob(): ...@@ -333,27 +335,27 @@ def doJob():
elif not retData['tycData'] and retData['reput']: elif not retData['tycData'] and retData['reput']:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败') baseCore.recordLog(xydm, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====') log.info(f'======={tycid}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode', social_code) baseCore.rePutIntoR('NewsResend:newsInfo', tycid)
continue continue
elif not retData['reput'] and not retData['tycData']: elif not retData['reput'] and not retData['tycData']:
continue continue
except: except:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败') baseCore.recordLog(xydm, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode', social_code) baseCore.rePutIntoR('NewsResend:newsInfo', tycid)
continue continue
count = data[17] count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理") log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time() start_time = time.time()
# 开始采集企业动态 # 开始采集企业动态
retData = beinWork(tycid, xydm,start_time) retData = beinWork(tycid, xydm, start_time)
# 信息采集完成后将该企业的采集次数更新 # 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount' runType = 'NewsRunCount'
count += 1 count += 1
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(xydm, runType, count)
total = retData['total'] total = retData['total']
up_okCount = retData['up_okCount'] up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount'] up_errorCount = retData['up_errorCount']
...@@ -361,12 +363,12 @@ def doJob(): ...@@ -361,12 +363,12 @@ def doJob():
log.info( log.info(
f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}") f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
except Exception as e: except Exception as e:
log.info(f'==={social_code}=====获取企业信息失败====') log.info(f'==={tycid}=====获取企业信息失败====')
#重新塞入redis # 重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode',social_code) baseCore.rePutIntoR('NewsResend:newsInfo', tycid)
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}') baseCore.recordLog(xydm, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5) time.sleep(5)
cursor.close() cursor.close()
...@@ -374,6 +376,7 @@ def doJob(): ...@@ -374,6 +376,7 @@ def doJob():
# 释放资源 # 释放资源
baseCore.close() baseCore.close()
if __name__ == '__main__': if __name__ == '__main__':
log.info(f'当前进程id为{baseCore.getPID()}') log.info(f'当前进程id为{baseCore.getPID()}')
doJob() doJob()
...@@ -31,7 +31,7 @@ class EsMethod(object): ...@@ -31,7 +31,7 @@ class EsMethod(object):
def __init__(self): def __init__(self):
# 创建Elasticsearch对象,并提供账号信息 # 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'),timeout=300 ) self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'),timeout=300 )
self.index_name='policy' self.index_name='researchreportdata'
''' '''
删除 删除
...@@ -47,7 +47,7 @@ if __name__ == "__main__": ...@@ -47,7 +47,7 @@ if __name__ == "__main__":
redis_conn = redis.Redis(connection_pool=pool) redis_conn = redis.Redis(connection_pool=pool)
while True: while True:
# 从redis中读取数据,去附件表中根据title查询,更新查到的附件id # 从redis中读取数据,去附件表中根据title查询,更新查到的附件id
item = redis_conn.lpop('YanBao:aa') item = redis_conn.lpop('YanBao:id')
if item: if item:
log.info(item) log.info(item)
id = item.decode() id = item.decode()
......
...@@ -33,33 +33,37 @@ class EsMethod(object): ...@@ -33,33 +33,37 @@ class EsMethod(object):
def __init__(self): def __init__(self):
# 创建Elasticsearch对象,并提供账号信息 # 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300) self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'policy' self.index_name = 'researchreportdata'
def queryatt(self,index_name,pnum): def queryatt(self,index_name,pnum):
body = { body = {
"query": { "query": {
"bool": { "bool": {
"must": [ "must": [
{ {
"term": { "match": {
"sid.keyword": { "type": "0"
"value": "1697458829758697473"
}
} }
}, },
{ {
"range": { "range": {
"createDate": { "createDate": {
"gte": "2023-11-29T10:00:00", "gte": "2023-12-13T00:00:00",
"lte": "2023-12-01T10:00:00" "lte": "2023-12-15T00:00:00"
} }
} }
} }
] ]
} }
}, },
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True, "track_total_hits": True,
"size": 200, "size": 200,
"from": pnum "from": pnum
......
"""
证监会企业名单
"""
import time
import random
import requests
from bs4 import BeautifulSoup
from retry import retry
from base import BaseCore
from obs import ObsClient
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
taskType = '企业名单/证监会'
def createDriver():
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument('user-agent='+'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
chrome_options.add_argument('--headless')
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver
@retry(tries=3, delay=5)
def RequestUrl(url):
# ip = baseCore.get_proxy()
# proxy = {'https': 'http://127.0.0.1:8888', 'http': 'http://127.0.0.1:8888'}
response = requests.get(url=url, headers=headers)
response.encoding = response.apparent_encoding
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'lxml')
return soup
else:
raise
def browserRequest(url):
browser = createDriver()
browser.get(url)
wait = WebDriverWait(browser, 30)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "m-table2")))
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
return soup
def getUrl(url_parm):
# 深市
# 沪市
url = f'http://eid.csrc.gov.cn/{url_parm}/index_f.html'
# 北交所
return url
# 映射关系
def getmap(dic_info):
data_dic = {
'公司全称': 'company_name',
'公司简称': 'short_name',
'英文名称': 'english_name',
'股票代码': 'gpdm',
'上市板块': 'shbk',
'股票类型': 'gp_type',
'法定代表人': 'coreperson',
'注册地址': 'address',
'行业种类': 'hy_type',
'公司网址': 'website',
'上市时间': 'ipotime',
'邮政编码': 'emial_code',
'公司电话': 'phone',
}
dict3 = {value: dic_info.get(key, '') for key, value in data_dic.items()}
print(dict3)
return dict3
# for key1,value1 in data_dic:
# for key2 in dic_info.keys():
# if key2 == key1:
# dic_info[data_dic[key1]] = dic_info[key2]
# del dic_info[key2]
# break
# else:
# dic_info[data_dic[key1]] = ''
# continue
# print(data_dic)
# 采集信息
def SpiderByZJH(url, start_time): # dic_info 数据库中获取到的基本信息
try:
soup = RequestUrl(url)
except:
# 请求失败,输出错误信息
log.error(f'请求失败:{url}')
#重新放入redis
time.sleep(random.randint(60, 120))
soup = ''
if soup == '':
return
# 判断查找内容是否存在
# try:
# is_exist = soup.find('div',class_='con').text
# if is_exist == '没有查询到数据':
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, url, '没有查询到数据')
# return
# except:
# pass
# 先获取页数
page = soup.find('div', class_='pages').find_all('li')[-1]
total = page.find('b').text
for i in range(1,int(total)+1):
log.info(f'==========正在采集第{i}页=========')
if i == 1:
href = url
else:
# http://eid.csrc.gov.cn/101811/index_3_f.html
href = url.split('index')[0] + f'index_{i}.html'
try:
soup = browserRequest(href)
except:
# 请求失败,输出错误信息
log.error(f'请求失败:{url}')
# 重新放入redis
tr_list1 = soup.find('table', class_='m-table2')
# print(tr_list1)
tr_list = tr_list1.find_all('tr')
# pageIndex = 0
for tr in tr_list[1:]:
dic_info = {}
# pageIndex += 1
td_list = tr.find_all('td')
gpdm = td_list[0].text
short_name = td_list[1].text
companyname = td_list[2].text
shbk = td_list[3].text.replace(' ', '').replace('\r', '').replace('\n', '') #上市板块
# print(pdf_url)
selectSql = f"select count(1) from ipo_enterprise_list where gpdm='{gpdm}' and company_name='{companyname}'"
cursor.execute(selectSql)
count = cursor.fetchone()[0]
if count > 0:
log.info(f"{gpdm}-------{companyname}---已经存在")
continue
else:
dic_info = {
'公司全称': companyname,
'公司简称': short_name,
'股票代码': gpdm,
# '上市板块': shbk
}
info_url = 'http://eid.csrc.gov.cn/' + td_list[0].find('a')['href']
soup_info = RequestUrl(info_url)
try:
info_list = soup_info.find('table',class_='m-table3').find_all('tr')[1:]
except Exception as e:
log.info(f'error---{e}---第{i}页--{info_url}')
info_list = []
for tr_ in info_list:
td_list = tr_.find_all('td')
for td in td_list:
value = td.find('span').text.replace('\r', '').replace('\n', '').replace('\t', '').replace(' ', '')
span_tag = td.find('span')
span_tag.decompose()
name = td.text.replace(':', '')
dic_info[name] = value
# 插入数据库
final_dic = getmap(dic_info)
values_tuple = tuple(final_dic.values())
# log.info(f"{gpdm}-------{companyname}---新增")
insertSql = f"insert into ipo_enterprise_list(company_name,short_name,english_name,gpdm,shbk,gp_type,coreperson,address,hy_type,website,ipotime,emial_code,phone) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
cursor.execute(insertSql,values_tuple)
cnx.commit()
log.info(f"{gpdm}-------{companyname}---新增")
log.info(f"【{i}/{total}】-----------end,耗时{baseCore.getTimeCost(start_time, time.time())}")
if __name__ == '__main__':
num = 0
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
# 'Cookie': 'yfx_c_g_u_id_10008998=_ck23112014074614515077233960865; yfx_f_l_v_t_10008998=f_t_1700460466453__r_t_1700460466453__v_t_1700460466453__r_c_0; yfx_mr_10008998=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_mr_f_10008998=%3A%3Amarket_type_free_search%3A%3A%3A%3Abaidu%3A%3A%3A%3A%3A%3A%3A%3Awww.baidu.com%3A%3A%3A%3Apmf_from_free_search; yfx_key_10008998=; _yfx_session_10008998=%7B%22_yfx_firsttime%22%3A%221701508120899%22%2C%22_yfx_lasttime%22%3A%221701508120899%22%2C%22_yfx_visittime%22%3A%221701508120899%22%2C%22_yfx_domidgroup%22%3A%221701508120899%22%2C%22_yfx_domallsize%22%3A%22100%22%2C%22_yfx_cookie%22%3A%2220231202170840906620987838830281%22%7D; acw_tc=01c604a717025467485993784e5c9f1847d885d2c82ee192efdfd627ba',
'Host': 'eid.csrc.gov.cn',
'If-Modified-Since': 'Thu, 14 Dec 2023 08:06:01 GMT',
'If-None-Match': '"657ab769-95b5"',
# 'Referer': 'http://eid.csrc.gov.cn/201010/index_3.html',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
dic_parms = {}
# 读取数据库获取股票代码 简称 以及 社会信用代码
while True:
start_time = time.time()
# 沪市http://eid.csrc.gov.cn/101111/index.html 深市http://eid.csrc.gov.cn/101811/index.html 北交所http://eid.csrc.gov.cn/102611/index.html
# url 变量 翻页 栏目 http://eid.csrc.gov.cn/101811/index_3_f.html
# 沪市主板 沪市科创板
# url_parms = ['201010', '201014']
# url_parms = ['201011', '201013']
url_parms = ['202610']
for url_parm in url_parms:
url = getUrl(url_parm)
start_time_cj = time.time()
log.info(f'======开始处理======')
SpiderByZJH(url, start_time)
break
cursor.close()
cnx.close()
baseCore.close()
import os import os
import os import os
import subprocess
import uuid import uuid
import requests,time, json, sys import requests,time, json, sys
...@@ -184,6 +185,11 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti ...@@ -184,6 +185,11 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti
pass pass
else: else:
log.info(f'====pdf解析失败====') log.info(f'====pdf解析失败====')
# 获取当前进程pid
current_pid = baseCore.getPID()
# todo: 重新启动新进程,杀死当前进程
subprocess.Popen([sys.executable] + sys.argv)
os.kill(current_pid, 9)
return False return False
num = num + 1 num = num + 1
att_id = tableUpdate(retData, com_name, year, pdf_name, num) att_id = tableUpdate(retData, com_name, year, pdf_name, num)
...@@ -220,7 +226,7 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti ...@@ -220,7 +226,7 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportNoticeTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
...@@ -317,12 +323,13 @@ def gonggao_info(dic_info): ...@@ -317,12 +323,13 @@ def gonggao_info(dic_info):
for n1 in range(0, 3): for n1 in range(0, 3):
try: try:
json_2 = requests.get(json_url, verify=False).json() ip = baseCore.get_proxy()
json_2 = requests.get(json_url, proxies=ip,verify=False).json()
break break
except: except:
if n1 == 2: if n1 == 2:
sys.exit(0) sys.exit(0)
time.sleep(5) time.sleep(60)
continue continue
try: try:
pdf_url = json_2['data']['attach_url'] pdf_url = json_2['data']['attach_url']
...@@ -364,8 +371,8 @@ if __name__ =='__main__': ...@@ -364,8 +371,8 @@ if __name__ =='__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterprise:mgqy_socialCode_add') # social_code = baseCore.redicPullData('NoticeEnterprise:mgqy_socialCode_add')
# social_code = 'ZZSN23030900000316' social_code = 'ZZSN23030900000316'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -383,6 +390,13 @@ if __name__ =='__main__': ...@@ -383,6 +390,13 @@ if __name__ =='__main__':
try: try:
gonggao_info(dic_info) gonggao_info(dic_info)
except: except:
time.sleep(30)
baseCore.rePutIntoR('NoticeEnterprise:mgqy_socialCode', social_code)
# 获取当前进程pid
current_pid = baseCore.getPID()
# todo: 重新启动新进程,杀死当前进程
subprocess.Popen([sys.executable] + sys.argv)
os.kill(current_pid, 9)
log.info(f'-----error:{com_name}----{social_code}------') log.info(f'-----error:{com_name}----{social_code}------')
......
import os import os
...@@ -58,12 +58,13 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -58,12 +58,13 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): for i in range(0, 3):
try: try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20) ip = baseCore.get_proxy()
response = requests.get(pdf_url, headers=headers,verify=False,proxies=ip, timeout=20)
file_size = int(response.headers.get('Content-Length')) file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text retData['content'] = response.text
break break
except: except Exception as e:
time.sleep(3) time.sleep(60)
continue continue
name = str(getuuid()) + category name = str(getuuid()) + category
...@@ -197,7 +198,7 @@ def sendKafka(social_code,newsUrl,dic_news): ...@@ -197,7 +198,7 @@ def sendKafka(social_code,newsUrl,dic_news):
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024*1024*20) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportNoticeTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
...@@ -347,6 +348,18 @@ def createDriver(): ...@@ -347,6 +348,18 @@ def createDriver():
driver = webdriver.Chrome(service=path, chrome_options=chrome_options) driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver return driver
def createDriver2():
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
ip = baseCore.get_proxy()
chrome_options.add_argument('--proxy-server=' + str(ip))
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver
def gonggao_info(dic_info): def gonggao_info(dic_info):
# code = '00175.HK' # code = '00175.HK'
code = dic_info[3] code = dic_info[3]
......
...@@ -42,7 +42,7 @@ class EsMethod(object): ...@@ -42,7 +42,7 @@ class EsMethod(object):
{ {
"match_phrase" : { "match_phrase" : {
"labels.relationId" : { "labels.relationId" : {
"query" : "1677" "query" : "1698"
} }
} }
} }
...@@ -59,7 +59,7 @@ class EsMethod(object): ...@@ -59,7 +59,7 @@ class EsMethod(object):
"aggs":{ "aggs":{
"duplicate_titles":{ "duplicate_titles":{
"terms":{ "terms":{
"field":"title.keyword", "field":"sourceAddress.keyword",
"min_doc_count":2, "min_doc_count":2,
"size":1000 "size":1000
}, },
...@@ -101,10 +101,14 @@ def main(page, p, esMethod): ...@@ -101,10 +101,14 @@ def main(page, p, esMethod):
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p) result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value'] total = result['hits']['total']['value']
if total == 0: # if total == 0:
# log.info('++++已没有数据+++++')
# return
documents = result["aggregations"]["duplicate_titles"]["buckets"]
print(len(documents))
if len(documents) == 0:
log.info('++++已没有数据+++++') log.info('++++已没有数据+++++')
return return
documents = result["aggregations"]["duplicate_titles"]["buckets"]
unique_document_ids = [bucket["duplicate_docs"]["hits"]["hits"][-1]["_id"] for bucket in documents] unique_document_ids = [bucket["duplicate_docs"]["hits"]["hits"][-1]["_id"] for bucket in documents]
# 删除重复的文档 # 删除重复的文档
for doc_id in unique_document_ids: for doc_id in unique_document_ids:
...@@ -138,8 +142,8 @@ if __name__ == "__main__": ...@@ -138,8 +142,8 @@ if __name__ == "__main__":
# log.info('++++已没有数据+++++') # log.info('++++已没有数据+++++')
# break # break
start = time.time() start = time.time()
num_threads = 5 num_threads = 1
run_threads(num_threads, esMethod, j) run_threads(num_threads, esMethod, j)
j += 1000 j += 1000
log.info(f'5线程 每个处理200条数据 总耗时{time.time() - start}秒') log.info(f'1线程 每个处理200条数据 总耗时{time.time() - start}秒')
\ No newline at end of file \ No newline at end of file
...@@ -99,7 +99,7 @@ def get_content2(): ...@@ -99,7 +99,7 @@ def get_content2():
time.sleep(1) time.sleep(1)
continue continue
try: try:
resp = requests.get(url=href, headers=headers, verify=False) resp = requests.get(url=href, headers=baseTool.headers, verify=False)
resp.encoding = resp.apparent_encoding resp.encoding = resp.apparent_encoding
resp_text = resp.text resp_text = resp.text
soup = BeautifulSoup(resp_text, 'html.parser') soup = BeautifulSoup(resp_text, 'html.parser')
......
...@@ -56,7 +56,7 @@ if __name__=="__main__": ...@@ -56,7 +56,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser.get(url) browser.get(url)
# 可改动 # 可改动
time.sleep(20) time.sleep(10)
s = requests.session() s = requests.session()
#获取到token和cookies #获取到token和cookies
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论