提交 864508c6 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

......@@ -27,7 +27,7 @@ headers = {
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
taskType = '企业动态/新浪财经'
taskType = '企业动态/新浪财经/国内'
pattern = r"\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}"
# 获取响应页面
......
......@@ -28,7 +28,7 @@ headers = {
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
taskType = '企业动态/新浪财经'
taskType = '企业动态/新浪财经/香港'
# 判断时间是否是正确格式
......@@ -51,7 +51,7 @@ def format_time(time_str):
def getrequests(url):
ip = baseCore.get_proxy()
req = requests.get(url, headers=headers,proxies=ip)
req.encoding = req.apparent_encoding
req.encoding = 'gbk'
soup = BeautifulSoup(req.text, 'html.parser')
return soup
......@@ -117,7 +117,7 @@ def getDic(social_code, title, href, pub_time):
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
# return 1
return 1
# 数据发送至Kafka
......@@ -165,77 +165,77 @@ def selectUrl(url, social_code):
def doJob():
# while True:
start_time = time.time()
# social_code = baseCore.redicPullData('NewsEnterprise:xgqy_nyse_socialCode')
social_code = '91330000747735638J'
if not social_code or social_code == 'None':
time.sleep(20)
data = baseCore.getInfomation(social_code)
gpdm = data[3]
log.info(f'{social_code}==={gpdm}===开始采集')
# if gpdm == '' or not gpdm:
# log.error(f'{social_code}===股票代码为空')
# continue
gpdm_ = gpdm.split('.')[0]
if len(gpdm_) != 5:
gpdm_ = gpdm_.zfill(5)
page = 1
num_ok = 0
num_error =0
while True:
url = f'http://stock.finance.sina.com.cn/hkstock/go.php/CompanyNews/page/{page}/code/{gpdm_}/.phtml'
soup = getrequests(url)
if '拒绝访问' in soup.text:
log.error(f'{social_code}===ip封禁')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
# r.rpush('NewsEnterprise:xgqy_nyse_socialCode',social_code)
time.sleep(1800)
break
next_flg = soup.find('div',class_='part02').text
if '暂无数据' in next_flg:
break
try:
li_list = soup.find('ul', class_='list01').find_all('li')
for li in li_list:
try:
a = li.find('a')
if a:
title = a.text
if title == '':
continue
href = a.get('href')
selects = selectUrl(href,social_code)
if selects:
log.info(f'{href}===已采集过')
continue
pub_time = format_time(li.find('span').text)
print(title)
flag = getDic(social_code,title,href,pub_time)
if flag == 1:
num_ok += 1
else:
num_error += 1
time.sleep(0.5)
except Exception as e:
ee = e.__traceback__.tb_lineno
log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
continue
# 增量使用
# if selects:
# break
except:
log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败')
page += 1
log.info(f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
start_time = time.time()
social_code = baseCore.redicPullData('NewsEnterprise:xgqy_nyse_socialCode')
# social_code = '91330000747735638J'
if not social_code or social_code == 'None':
time.sleep(20)
data = baseCore.getInfomation(social_code)
gpdm = data[3]
log.info(f'{social_code}==={gpdm}===开始采集')
# if gpdm == '' or not gpdm:
# log.error(f'{social_code}===股票代码为空')
# continue
gpdm_ = gpdm.split('.')[0]
if len(gpdm_) != 5:
gpdm_ = gpdm_.zfill(5)
page = 1
num_ok = 0
num_error =0
while True:
url = f'http://stock.finance.sina.com.cn/hkstock/go.php/CompanyNews/page/{page}/code/{gpdm_}/.phtml'
soup = getrequests(url)
if '拒绝访问' in soup.text:
log.error(f'{social_code}===ip封禁')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
# r.rpush('NewsEnterprise:xgqy_nyse_socialCode',social_code)
time.sleep(1800)
break
next_flg = soup.find('div',class_='part02').text
if '暂无数据' in next_flg:
break
try:
li_list = soup.find('ul', class_='list01').find_all('li')
for li in li_list:
try:
a = li.find('a')
if a:
title = a.text
if title == '':
continue
href = a.get('href')
selects = selectUrl(href,social_code)
if selects:
log.info(f'{href}===已采集过')
continue
pub_time = format_time(li.find('span').text)
print(title)
flag = getDic(social_code,title,href,pub_time)
if flag == 1:
num_ok += 1
else:
num_error += 1
time.sleep(0.5)
except Exception as e:
ee = e.__traceback__.tb_lineno
log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
continue
# 增量使用
# if selects:
# break
except:
log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败')
page += 1
log.info(f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
......
"""
新浪财经国内企业公告
"""
from datetime import datetime
import json
import re
import time
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from obs import ObsClient
import fitz
from urllib.parse import unquote
from base.BaseCore import BaseCore
taskType = '企业公告/新浪财经/国内'
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
r = baseCore.r
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
pattern = r"\d{4}-\d{2}-\d{2}"
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# 获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url, pdf_name, type_id, social_code):
start_time = time.time()
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'Liuliyuans',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
name = pdf_name + '.pdf'
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', f'ZJH/{now_time}/' + name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
def secrchATT(item_id, name, type_id, order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, name, type_id, order_by))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size)
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
log.info("更新完成:{}".format(Upsql))
selects = secrchATT(item_id, pdf_name, type_id, order_by)
id = selects[0]
return id
def ifInstert(social_code, pdf_url):
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='证监会' and type='1' '''
cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone()
return selects
def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time, com_name, num):
# 判断文件是否已经存在obs服务器中
# file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc'
now_time = time.strftime("%Y-%m")
file_path = 'ZJH/' + now_time + '/' + pdf_name + '.pdf'
response = obsClient.getObjectMetadata('zzsn', file_path)
if response.status >= 300:
log.info('=====文件不存在obs=====')
pass
else:
log.info(f'=====文件存在obs========{file_path}')
return False
# 上传至华为云服务器
retData = uptoOBS(pdf_url, pdf_name, 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
att_id = tableUpdate(retData, com_name, year, pdf_name, num)
if att_id:
pass
else:
return False
content = retData['content']
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '新浪财经',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接
'summary': '',
'title': pdf_name,
'type': 3,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
log.info(dic_result)
return False
# 判断时间是否是正确格式
def format_time(time_str):
try:
# 尝试将时间字符串按指定格式解析为datetime对象
datetime_obj = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# 检查解析后的时间对象是否与原字符串完全匹配
if datetime_obj.strftime("%Y-%m-%d %H:%M:%S") == time_str:
return time_str
except ValueError:
pass
# 如果无法解析为指定格式,则格式化为"%Y-%m-%d %H:%M:%S"
try:
formatted_time = datetime.strftime(datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
except:
formatted_time = datetime.strftime(datetime.strptime(time_str, "%Y-%m-%d"), "%Y-%m-%d %H:%M:%S")
return formatted_time
# 获取响应页面
@retry(tries=3, delay=1)
def getrequests(url):
ip = baseCore.get_proxy()
req = requests.get(url, headers=headers, proxies=ip)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
return soup
@retry(tries=3, delay=1)
def insertMysql(social_code, link, pub_time):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'新浪财经',
'1',
pub_time
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
def doJob():
start_time = time.time()
social_code = '91440300192185379H'
data = baseCore.getInfomation(social_code)
gpdm = data[3]
com_name = data[1]
short_name = data[4]
log.info(f'{social_code}==={gpdm}===开始采集')
if gpdm == '' or not gpdm:
log.error(f'{social_code}===股票代码为空')
# continue
page = 1
num = 1
# while True:
url = f'https://vip.stock.finance.sina.com.cn/corp/view/vCB_AllBulletin.php?stockid={gpdm}&Page={page}'
soup = getrequests(url)
# if '拒绝访问' in soup.text:
# log.error(f'{social_code}===ip封禁')
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
# # r.rpush('NewsEnterprise:gnqy_nyse_socialCode',social_code)
# time.sleep(1800)
# break
# try:
div_flg = soup.find('div', class_='tagmain').text
if '暂时没有数据!' in div_flg:
log.info(f"{social_code}==={gpdm}===没有公告")
else:
ul = soup.find('div', class_='datelist').find('ul')
a_list = ul.find_all('a')
time_list = re.findall(pattern, str(ul))
for i in range(len(a_list)):
# try:
name_pdf = a_list[i].text.lstrip().strip()
if name_pdf == '':
continue
href = a_list[i].get('href')
if 'http' not in href:
href = 'https://finance.sina.com.cn' + href
selects = ifInstert(short_name, social_code, href)
if selects:
log.info(f'{href}===已采集')
continue
pub_time = format_time(time_list[i])
year = pub_time[:4]
result = GetContent(href, name_pdf, social_code, year, pub_time, start_time, com_name, num)
num += 1
if result:
# 公告信息列表
log.info(f'{short_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, '成功')
# 发送kafka成功之后 再插入数据库
insert = insertMysql(social_code, href, pub_time)
if insert:
log.info(f'===={social_code}========{name_pdf}=====插入库成功')
pass
else:
continue
# time.sleep(0.5)
# except Exception as e:
# ee = e.__traceback__.tb_lineno
# log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
# break
# except:
# log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败')
# next_flg = soup.select('#con02-7 > table > tr')[1].select('div')[2].text
# if '下一页' not in next_flg:
# break
# page += 1
# break
# log.info(
# f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
if __name__ == '__main__':
doJob()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论