提交 204d228a 作者: 薛凌堃

9/16

上级 a492eb60
......@@ -301,29 +301,45 @@ def BaseInfoAbroad_task():
def FBS():
cnx,cursor = connectSql()
# todo:调整为获取福布斯的数据库
# gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
# cursor.execute(gw_query)
# gw_result = cursor.fetchall()
gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
cursor.execute(gw_query)
gw_result = cursor.fetchall()
#获取国内企业
gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 "
# gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 "
# cursor.execute(gn_query)
# gn_result = cursor.fetchall()
#
# gn_social_list = [item[0] for item in gn_result]
gw_social_list = [item[0] for item in gw_result]
for item in gw_social_list:
r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
# r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
# for item in gn_social_list:
# if not r.exists(item):
# # r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# # r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('AnnualEnterprise:gnshqy_socialCode',item)
# # r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# # r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor)
#省属国有企业 盟市国有企业
def MengZhi():
cnx, cursor = connectSql()
gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where b.type=5 and a.SocialCode=b.SocialCode;"
# gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where b.type=4 and a.SocialCode=b.SocialCode;"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
# gw_social_list = [item[0] for item in gw_result]
# for item in gw_social_list:
# r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
# r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
for item in gn_social_list:
if not r.exists(item):
# r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
r.rpush('AnnualEnterprise:gnshqy_socialCode',item)
# r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor)
r.rpush('BaseInfoEnterpriseMz:gnqy_socialCode', item)
r.rpush('CorPersonEnterprise:gnqy_socialCode', item)
closeSql(cnx, cursor)
#将IPO的国外股票代码放到redis中
def yahooCodeFromSql():
......@@ -366,7 +382,8 @@ if __name__ == "__main__":
# NewsEnterprise_task()
# NewsEnterprise()
# BaseInfoEnterprise()
FBS()
# FBS()
MengZhi()
# NoticeEnterprise_task()
# AnnualEnterprise_task()
# NoticeEnterprise()
......
......@@ -333,7 +333,7 @@ if __name__ == '__main__':
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
social_code = '91110000802100433B'
social_code = '91330000734530895W'
if social_code == '':
time.sleep(20)
continue
......
......@@ -37,11 +37,15 @@ def find_id_by_name(start,token,name):
time.sleep(5)
continue
time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'}
#{'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
......
"""
打开SEC网址——【FILINGS】——【Company Filing】——输入证券代码——选10-K和20-F为年报
"""
import json
import re
import time
from base.BaseCore import BaseCore
baseCore = BaseCore()
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from selenium import webdriver
url = 'https://www.sec.gov/edgar/browse/?CIK=1815846&owner=exclude'
def spider(com_name,cik):
url = f'https://www.sec.gov/edgar/browse/?CIK={cik}&owner=exclude'
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# print(soup)
select_ann = soup.find_all('tr', class_='odd')
for tr in select_ann:
form_type = tr.find('td').text
if form_type == '20-F':
# print(tr)
# 获取原文链接
href = tr.find('a', class_='document-link')['href']
print(href)
if 'ix?doc' in href:
href = 'https://www.sec.gov/' + href.split('/ix?doc=/')[1]
else:
href = 'https://www.sec.gov' + href
print(href)
# 获取发布时间
a_list = tr.find_all('a')
# print(a_list)
for a in a_list:
text = a.text
match = re.search(pattern, text)
if match:
pub_date = match.group(0)
# print(pub_date)
year = pub_date[:4]
break
else:
pub_date = ''
year = ''
# 根据年报的链接,请求年报内容,不需要上传文件服务器,直接发送kafka
browser.get(href)
time.sleep(3)
i_page_source = browser.page_source
i_soup = BeautifulSoup(i_page_source, 'html.parser')
# print(i_page_source)
content = i_soup.text
# 采集下来正文内容,直接传输kafka
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
title = f'{com_name}:{year}年年度报告'
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': i_page_source,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': 'SEC美国证券交易委员会',
'publishDate': pub_date,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
# try:
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# kafka_result = producer.send("researchReportTopic",
# json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
#
# print(kafka_result.get(timeout=10))
#
# dic_result = {
# 'success': 'ture',
# 'message': '操作成功',
# 'code': '200',
# }
# print(dic_result)
#
# except Exception as e:
# dic_result = {
# 'success': 'false',
# 'message': '操作失败',
# 'code': '204',
# 'e': e
# }
def getrequest(social_code,url,headers,data):
#通过请求post接口获取企业的CIK
response = requests.post(url=url, headers=headers, data=data) # ,proxies=ip)
response.encoding = response.apparent_encoding
# 检查响应状态码
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
result = response.json()
# print(result)
pass
else:
# 请求失败,输出错误信息
print('请求失败:', response.status_code, response.text)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
result = ''
return result
#模拟浏览器
chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver)
pattern = r"\d{4}-\d{2}-\d{2}"
if __name__ == '__main__':
headers = {
'authority': 'efts.sec.gov',
'method': 'POST',
'path': '/LATEST/search-index',
'scheme': 'https',
'accept': '*/*',
'accept-encoding': 'gzip deflate br',
'accept-language': 'zh-CNzh;q=0.9en;q=0.8',
'content-length': '34',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'origin': 'https://www.sec.gov',
'referer': 'https://www.sec.gov/',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/80.0.3987.116 Safari/537.36'
}
url = 'https://efts.sec.gov/LATEST/search-index'
num = 0
taskType = '企业年报/雪球网'
while True:
start_time = time.time()
social_code = ''
# if not social_code:
# time.sleep(20)
# continue
# if social_code == 'None':
# time.sleep(20)
# continue
# if social_code == '':
# time.sleep(20)
# continue
# dic_info = baseCore.getInfomation(social_code)
# count = dic_info[15]
# code = dic_info[3]
# com_name = dic_info[4]
# if code is None:
# exeception = '股票代码为空'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
# continue
code = 'BP'
#"MNSO" post请求 获取企业CIK
# payload = {"keysTyped":f"{code}","narrow":flag}
payload = {"keysTyped": "BP", "narrow":True}
data = json.dumps(payload)
result = getrequest(social_code,url,headers,data)
# print(result)
#判断接口返回的数据哪一条是该企业 根据股票代码
tickers = result['hits']['hits']
for ticker in tickers:
i_t = ticker['_source']['tickers']
if i_t == code:
cik = ticker['_id']
print(cik)
break
# break
spider(cik)
break
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
print(soup)
select_ann = soup.find_all('tr',class_='odd')
for tr in select_ann:
want_type = tr.find('td').text
if want_type=='20-F':
print('yes')
#获取原文链接
td = tr.find('td').find('a',class_='document-link')['title_href']
print(td)
"""
"""
......@@ -293,11 +293,12 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
# 信息插入数据库
insert = InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type)
log.info(f'======={short_name}========{code}===插入公告库成功')
if insert:
# # 公告信息列表
# okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
log.info(f'======={short_name}========{code}===插入公告库成功')
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num)
if result:
......@@ -319,7 +320,9 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
# except:
# pass
continue
else:
log.info(f'======={short_name}========{code}===已存在')
continue
if __name__ == '__main__':
num = 0
......
......@@ -440,11 +440,21 @@ class BaseCore:
def doc_page(self,file_path):
doc = Document(file_path)
return len(doc.sections)
def pdf_page(self,resp_content):
# 解析pdf文件
with fitz.open(stream=resp_content, filetype='pdf') as doc:
page_size = doc.page_count
return page_size
def pdf_content(self,resp_content):
# 解析pdf文件内容
content = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
with fitz.open(stream=resp_content, filetype='pdf') as doc:
# page_size = doc.page_count
for page in doc.pages():
content += page.get_text()
break
except:
time.sleep(3)
continue
return content
# 替换为绝对路径之后,解析出来a.href
def uploadToserver(self,file_href,item_id):
......@@ -477,7 +487,7 @@ class BaseCore:
# page_size = self.pdf_page(resp_content)
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content)
result = client.upload_by_buffer(resp_content,file_ext_name=category.replace('.',''))
self.getLogger().info('-------文件上传成功------')
break
except:
......
......@@ -27,7 +27,7 @@ def doJob():
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('CorPersonEnterprise:gnqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待
# social_code = 'ZZSN23011300000004'
# social_code = '91610000220568570K'
if social_code == None:
time.sleep(20)
continue
......@@ -143,7 +143,10 @@ def doJob():
if list_all:
for one_info in list_all:
name = one_info['personal_name']
sex = one_info['gender2']
try:
sex = one_info['gender2']
except:
sex = ''
education = ''
position = one_info['position_name']
Salary = ''
......
......@@ -54,11 +54,10 @@ if __name__=="__main__":
# chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
chromedriver = r'D:/chrome/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(70)
time.sleep(60)
s = requests.session()
#获取到token和cookies
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -55,19 +55,13 @@ def getZx(xydm, url, title, cnx, path):
# 动态信息列表
list_info = [
xydm,
title,
'',
content,
pub_time,
url,
'雅虎财经',
author,
'2',
'en'
'2'
]
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
......@@ -77,14 +71,10 @@ def getZx(xydm, url, title, cnx, path):
return exception
log.info(f"文章耗时,耗时{baseCore.getTimeCost(start_time_content, time.time())}")
try:
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (url, social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': id,
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': content,
......@@ -171,6 +161,7 @@ def scroll(xydm,name,gpdm):
except Exception as e:
log.error(f"{name}--{gpdm}--获取不到最后一条链接")
break
# todo:增量时 需打开注释
# try:
# selects = selectUrl(last_url_,xydm)
# except:
......@@ -189,12 +180,14 @@ def rePutIntoR(item):
if __name__ == "__main__":
path = r'D:\chrome\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
cnx = baseCore.cnx
cursor = baseCore.cursor
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gwqy_socialCode')
social_code = baseCore.redicPullData('NewsEnterpriseFbs:gwqy_socialCode')
# social_code = 'ZZSN22080900000046'
# 判断 如果Redis中已经没有数据,则等待
if not social_code :
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论