提交 d82da41e 作者: 丁双波

Merge remote-tracking branch 'origin/master'

...@@ -5,22 +5,18 @@ import socket ...@@ -5,22 +5,18 @@ import socket
import sys import sys
import time import time
import fitz
import logbook import logbook
import logbook.more import logbook.more
import pandas as pd import pandas as pd
import requests import requests
import zhconv import zhconv
import pymysql
import redis import redis
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
from openpyxl import Workbook
import langid import langid
#创建连接池 #创建连接池
import pymysql import pymysql
from pymysql import connections
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
# import sys # import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client') # sys.path.append('D://zzsn_spider//base//fdfs_client')
...@@ -28,6 +24,15 @@ from DBUtils.PooledDB import PooledDB ...@@ -28,6 +24,15 @@ from DBUtils.PooledDB import PooledDB
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf') tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
from obs import ObsClient
import fitz
from urllib.parse import unquote
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
...@@ -659,12 +664,10 @@ class BaseCore: ...@@ -659,12 +664,10 @@ class BaseCore:
create_time = retData['create_time'] create_time = retData['create_time']
order_by = num order_by = num
selects = self.secrchATT(item_id,year,type_id) selects = self.secrchATT(item_id,year,type_id)
# sel_sql = '''select id,item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
# self.cursor.execute(sel_sql, (item_id, year,type_id))
# selects = self.cursor.fetchone()
if selects: if selects:
self.getLogger().info(f'com_name:{com_name}已存在') self.getLogger().info(f'com_name:{com_name}--{year}已存在')
id = selects[0] id = ''
return id return id
else: else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
...@@ -695,6 +698,80 @@ class BaseCore: ...@@ -695,6 +698,80 @@ class BaseCore:
log = self.getLogger() log = self.getLogger()
log.info('======保存企业CIK失败=====') log.info('======保存企业CIK失败=====')
#上传至obs华为云服务器,并解析破地方的内容和页数
# 获取文件大小
def convert_size(self,size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def obsexist(self,file_path):
# # 文件路径
# file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc'
# 检查文件是否存在
response = obsClient.getObjectMetadata('zzsn', file_path)
if response.status >= 300:
self.getLogger().info('=====文件不存在obs=====')
else:
self.getLogger().info(f'=====文件存在obs========{file_path}')
def uptoOBS(self,pdf_url, name_pdf,type_id, social_code,pathType,taskType,start_time):
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
name = name_pdf + '.pdf'
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = self.getTimeCost(start_time, time.time())
self.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
......
...@@ -475,7 +475,14 @@ def kegaishifan(): ...@@ -475,7 +475,14 @@ def kegaishifan():
#双百企业 #双百企业
def shuangbaiqiye(): def shuangbaiqiye():
pass cnx, cursor = connectSql()
query = "SELECT CompanyName FROM Hundred"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('hundred:baseinfo', item)
#专精特新 #专精特新
def zhuangjingtexind(): def zhuangjingtexind():
...@@ -484,7 +491,8 @@ def zhuangjingtexind(): ...@@ -484,7 +491,8 @@ def zhuangjingtexind():
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
# danxiangguanjun() # danxiangguanjun()
kegaishifan() # kegaishifan()
shuangbaiqiye()
# NoticeEnterprise() # NoticeEnterprise()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
......
...@@ -541,7 +541,10 @@ class BaseCore: ...@@ -541,7 +541,10 @@ class BaseCore:
self.cursor.execute(query) self.cursor.execute(query)
token_list = self.cursor.fetchall() token_list = self.cursor.fetchall()
self.cnx.commit() self.cnx.commit()
token = token_list[random.randint(0, len(token_list)-1)][0] try:
token = token_list[random.randint(0, len(token_list)-1)][0]
except:
token = ''
return token return token
# 删除失效的token # 删除失效的token
......
# -*- coding: utf-8 -*-
import time
from urllib.parse import quote
import requests
import urllib3
from BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
# headers = {
# 'Host': 'xcx.qcc.com',
# 'Connection': 'keep-alive',
# 'Qcc-Platform': 'mp-weixin',
# 'Qcc-Timestamp': '',
# 'Qcc-Version': '1.0.0',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
# 'content-type': 'application/json',
# 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
# 'Accept-Encoding': 'gzip, deflate, br,'
# }
headers = {
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive',
'x-request-device-type': 'Android',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8391',
'Content-Type': 'application/json',
'Qcc-Version': '1.0.0',
'authMini': 'Bearer f51dae1a2fcb109fa9ec58bd4a85e5c5',
'xweb_xhr': '1',
'xcx-version': '2023.09.27',
'Qcc-Platform': 'mp-weixin',
'Qcc-CurrentPage': '/company-subpackages/business/index',
'Qcc-Timestamp': '1696661787803',
'Qcc-RefPage': '/company-subpackages/detail/index',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/307/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh'
}
# 通过企业名称或信用代码获取企查查id
def find_id_by_name(start,token,name):
urllib3.disable_warnings()
qcc_key = name
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = f"https://xcx.qcc.com/mp-weixin/forwardApp/v3/base/advancedSearch?token={token}&t={t}&pageIndex=1&needGroup=yes&insuredCntStart=&insuredCntEnd=&startDateBegin=&startDateEnd=&registCapiBegin=&registCapiEnd=&countyCode=&province=&sortField=&isSortAsc=&searchKey={quote(qcc_key)}&searchIndex=default&industryV3="
for lll in range(1, 6):
try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break
except Exception as e:
print(f'{e}-------------重试')
time.sleep(5)
continue
time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '':
KeyNo = 'null'
else:
KeyNo = 'null'
except:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}')
return KeyNo
log.info("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo
\ No newline at end of file
...@@ -541,7 +541,10 @@ class BaseCore: ...@@ -541,7 +541,10 @@ class BaseCore:
self.cursor.execute(query) self.cursor.execute(query)
token_list = self.cursor.fetchall() token_list = self.cursor.fetchall()
self.cnx.commit() self.cnx.commit()
token = token_list[random.randint(0, len(token_list)-1)][0] try:
token = token_list[random.randint(0, len(token_list)-1)][0]
except:
token = ''
return token return token
# 删除失效的token # 删除失效的token
......
...@@ -11,24 +11,28 @@ import logbook.more ...@@ -11,24 +11,28 @@ import logbook.more
import pandas as pd import pandas as pd
import requests import requests
import zhconv import zhconv
import pymysql
import redis import redis
from docx import Document
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from openpyxl import Workbook
import langid import langid
#创建连接池 #创建连接池
import pymysql import pymysql
from pymysql import connections
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
# import sys # import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client') # sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('E:\\kkwork\\zzsn_spider\\comData\\policylaw\\client.conf') tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\comData\\policylaw\\client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
from obs import ObsClient
import fitz
from urllib.parse import unquote
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
...@@ -437,9 +441,9 @@ class BaseCore: ...@@ -437,9 +441,9 @@ class BaseCore:
#解析word文件页数 #解析word文件页数
def doc_page(self,file_path): # def doc_page(self,file_path):
doc = Document(file_path) # doc = Document(file_path)
return len(doc.sections) # return len(doc.sections)
def pdf_content(self,resp_content): def pdf_content(self,resp_content):
# 解析pdf文件内容 # 解析pdf文件内容
content = '' content = ''
...@@ -507,9 +511,9 @@ class BaseCore: ...@@ -507,9 +511,9 @@ class BaseCore:
# retData['page_size'] = page_size # retData['page_size'] = page_size
return retData return retData
def secrchATT(self,item_id,file_name,type_id): def secrchATT(self,item_id,file_name,type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s ''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s '''
self.cursor_.execute(sel_sql, (item_id, file_name, type_id)) self.cursor_.execute(sel_sql, (item_id, file_name, type_id,order_by))
selects = self.cursor_.fetchone() selects = self.cursor_.fetchone()
return selects return selects
...@@ -527,26 +531,81 @@ class BaseCore: ...@@ -527,26 +531,81 @@ class BaseCore:
page_size = retData['page_size'] page_size = retData['page_size']
create_time = retData['create_time'] create_time = retData['create_time']
order_by = num order_by = num
selects = self.secrchATT(item_id,file_name,type_id)
if selects:
self.getLogger().info(f'com_name:{com_name}已存在') Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
id = selects[0]
return id,full_path values = (
else: file_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' status, create_by,
create_time)
values = (
file_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, self.cursor_.execute(Upsql, values) # 插入
status, create_by, self.cnx_.commit() # 提交
create_time) self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,file_name,type_id,order_by)
self.cursor_.execute(Upsql, values) # 插入 id = selects[0]
self.cnx_.commit() # 提交 return id,full_path
self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,file_name,type_id)
id = selects[0] # 获取文件大小
return id,full_path def convert_size(self,size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(self,file_href,item_id,pathType,file_name):
headers = {}
category = os.path.splitext(file_href)[1]
retData = {'state': False, 'type_id': 7, 'item_id': item_id, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(file_href, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
# name = file_name
if category in file_name:
pass
else:
file_name = file_name + '.' + category
result = obsClient.putContent('zzsn', f'{pathType}' + file_name, content=response.content)
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now
except Exception as e:
print(f'error:{e}')
return retData
return retData
......
"""
新浪财经国内企业动态
"""
import json
import re
import time
import jieba
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from base.smart import smart_extractor
from base.BaseCore import BaseCore
# 初始化,设置中文分词
jieba.cut("必须加载jieba")
smart = smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
r = baseCore.r
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
taskType = '企业动态/新浪财经'
pattern = r"\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}"
# 获取响应页面
@retry(tries=3, delay=1)
def getrequests(url):
ip = baseCore.get_proxy()
req = requests.get(url, headers=headers,proxies=ip)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
return soup
# 解析内容
def getDic(social_code, title, href, pub_time):
start_time = time.time()
if 'http' not in href:
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try:
# 带标签正文
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
if content == '':
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0
except:
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '新浪财经',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': pub_time[:4]
}
# print(dic_news)
try:
sendKafka(dic_news, start_time)
log.info(f'Kafka发送成功')
try:
insertMysql(social_code, href)
log.info(f'数据库保存成功')
except:
log.error(f'{href}===数据入库失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败')
except:
log.error(f'{href}===发送Kafka失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
return 1
# 数据发送至Kafka
@retry(tries=3, delay=1)
def sendKafka(dic_news, start_time):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(dic_news['socialCreditCode'], taskType, state, takeTime, dic_news['sourceAddress'], '')
# 数据保存入库,用于判重
@retry(tries=3, delay=1)
def insertMysql(social_code, link):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'新浪财经',
'2',
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 判断动态是否采集过
@retry(tries=3, delay=1)
def selectUrl(url, social_code):
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (url, social_code))
selects = cursor.fetchone()
return selects
def doJob():
while True:
start_time = time.time()
social_code = baseCore.redicPullData('NewsEnterprise:gnqy_nyse_socialCode')
# social_code = '914403007261824992'
if not social_code or social_code == 'None':
print(f'============已没有数据============等待===============')
time.sleep(1800)
data = baseCore.getInfomation(social_code)
gpdm = data[3]
log.info(f'{social_code}==={gpdm}===开始采集')
exchange = data[10]
if gpdm == '' or not gpdm:
log.error(f'{social_code}===股票代码为空')
continue
# 根据所在交易所不同,修改股票代码
if exchange == 1:
gpdm_ = 'bj' + gpdm
elif exchange == 2:
gpdm_ = 'sh' + gpdm
elif exchange == 3:
gpdm_ = 'sz' + gpdm
else:
log.info(f'{social_code}==={gpdm}===不在北京、上海、深圳交易所')
continue
page = 1
num_ok = 0
num_error =0
while True:
url = f'http://vip.stock.finance.sina.com.cn/corp/view/vCB_AllNewsStock.php?symbol={gpdm_}&Page={page}'
soup = getrequests(url)
if '拒绝访问' in soup.text:
log.error(f'{social_code}===ip封禁')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
r.rpush('NewsEnterprise:gnqy_nyse_socialCode',social_code)
time.sleep(1800)
break
try:
ul = soup.find('div', class_='datelist').find('ul')
a_list = ul.find_all('a')
time_list = re.findall(pattern, str(ul))
for i in range(len(a_list)):
try:
title = a_list[i].text.lstrip().strip()
if title == '':
continue
href = a_list[i].get('href')
selects = selectUrl(href,social_code)
if selects:
log.info(f'{href}===已采集')
continue
if 'http' not in href:
href = 'https://finance.sina.com.cn' + href
pub_time = time_list[i].replace('\xa0', ' ') + ":00"
flg = getDic(social_code,title,href,pub_time)
if flg == 0:
num_error += 1
else:
num_ok += 1
time.sleep(0.5)
except Exception as e:
ee = e.__traceback__.tb_lineno
log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
break
except:
log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败')
next_flg = soup.select('#con02-7 > table > tr')[1].select('div')[2].text
if '下一页' not in next_flg:
break
page += 1
break
log.info(f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
if __name__ == "__main__":
doJob()
"""
新浪财经香港企业动态
"""
from datetime import datetime
import json
import re
import time
import jieba
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from base.smart import smart_extractor
from base.BaseCore import BaseCore
# 初始化,设置中文分词
jieba.cut("必须加载jieba")
smart = smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
r = baseCore.r
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
taskType = '企业动态/新浪财经'
# 判断时间是否是正确格式
def format_time(time_str):
try:
# 尝试将时间字符串按指定格式解析为datetime对象
datetime_obj = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# 检查解析后的时间对象是否与原字符串完全匹配
if datetime_obj.strftime("%Y-%m-%d %H:%M:%S") == time_str:
return time_str
except ValueError:
pass
# 如果无法解析为指定格式,则格式化为"%Y-%m-%d %H:%M:%S"
formatted_time = datetime.strftime(datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
return formatted_time
# 获取响应页面
@retry(tries=3, delay=1)
def getrequests(url):
ip = baseCore.get_proxy()
req = requests.get(url, headers=headers,proxies=ip)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
return soup
# 解析内容
def getDic(social_code, title, href, pub_time):
start_time = time.time()
if 'http' not in href:
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try:
# 带标签正文
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
if content == '':
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0
except:
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '新浪财经',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': pub_time[:4]
}
# print(dic_news)
# try:
# sendKafka(dic_news, start_time)
# log.info(f'Kafka发送成功')
# try:
# insertMysql(social_code, href)
# log.info(f'数据库保存成功')
# except:
# log.error(f'{href}===数据入库失败')
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败')
# except:
# log.error(f'{href}===发送Kafka失败')
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
# return 1
# 数据发送至Kafka
@retry(tries=3, delay=1)
def sendKafka(dic_news, start_time):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(dic_news['socialCreditCode'], taskType, state, takeTime, dic_news['sourceAddress'], '')
# 数据保存入库,用于判重
@retry(tries=3, delay=1)
def insertMysql(social_code, link):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'新浪财经',
'2',
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 判断动态是否采集过
@retry(tries=3, delay=1)
def selectUrl(url, social_code):
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (url, social_code))
selects = cursor.fetchone()
return selects
def doJob():
# while True:
start_time = time.time()
# social_code = baseCore.redicPullData('NewsEnterprise:xgqy_nyse_socialCode')
social_code = '91330000747735638J'
if not social_code or social_code == 'None':
time.sleep(20)
data = baseCore.getInfomation(social_code)
gpdm = data[3]
log.info(f'{social_code}==={gpdm}===开始采集')
# if gpdm == '' or not gpdm:
# log.error(f'{social_code}===股票代码为空')
# continue
gpdm_ = gpdm.split('.')[0]
if len(gpdm_) != 5:
gpdm_ = gpdm_.zfill(5)
page = 1
num_ok = 0
num_error =0
while True:
url = f'http://stock.finance.sina.com.cn/hkstock/go.php/CompanyNews/page/{page}/code/{gpdm_}/.phtml'
soup = getrequests(url)
if '拒绝访问' in soup.text:
log.error(f'{social_code}===ip封禁')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
# r.rpush('NewsEnterprise:xgqy_nyse_socialCode',social_code)
time.sleep(1800)
break
next_flg = soup.find('div',class_='part02').text
if '暂无数据' in next_flg:
break
try:
li_list = soup.find('ul', class_='list01').find_all('li')
for li in li_list:
try:
a = li.find('a')
if a:
title = a.text
if title == '':
continue
href = a.get('href')
selects = selectUrl(href,social_code)
if selects:
log.info(f'{href}===已采集过')
continue
pub_time = format_time(li.find('span').text)
print(title)
flag = getDic(social_code,title,href,pub_time)
if flag == 1:
num_ok += 1
else:
num_error += 1
time.sleep(0.5)
except Exception as e:
ee = e.__traceback__.tb_lineno
log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
continue
# 增量使用
# if selects:
# break
except:
log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败')
page += 1
log.info(f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
if __name__ == "__main__":
doJob()
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论