提交 4c5b1a70 作者: XveLingKun

0906

上级 255f8c19
...@@ -2,6 +2,13 @@ ...@@ -2,6 +2,13 @@
<project version="4"> <project version="4">
<component name="PublishConfigData" remoteFilesAllowedToDisappearOnAutoupload="false"> <component name="PublishConfigData" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData> <serverData>
<paths name="root@114.115.141.81:22">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@114.115.141.81:22 (2)"> <paths name="root@114.115.141.81:22 (2)">
<serverdata> <serverdata>
<mappings> <mappings>
...@@ -16,6 +23,13 @@ ...@@ -16,6 +23,13 @@
</mappings> </mappings>
</serverdata> </serverdata>
</paths> </paths>
<paths name="root@114.116.49.86:22 (2)">
<serverdata>
<mappings>
<mapping local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@114.116.54.108:22"> <paths name="root@114.116.54.108:22">
<serverdata> <serverdata>
<mappings> <mappings>
......
...@@ -438,6 +438,464 @@ def test08(): ...@@ -438,6 +438,464 @@ def test08():
else: else:
print(f'键 {key} 中不存在值 {target_value}') print(f'键 {key} 中不存在值 {target_value}')
def test09():
import re
text = "找到与 国资委 相关的中文图书 247 种,用时 0.010 秒 当前为第 25 页"
match = re.search(r'(\d+)\s*种', text)
if match:
print("找到匹配的数字:", match.group(1))
else:
print("没有找到匹配的数字")
def test10():
from bs4 import BeautifulSoup
import pandas as pd
# 假设html_content是包含上述HTML的字符串
html_content = """
<dl>
<dd>【作 者】上海市国有资产监督管理委员会编著</dd>
<dd>【形态项】324</dd>
<dd>【出版项】上海:上海社会科学院出版社 , 2021.04</dd>
<dd>【ISBN号】978-7-5520-3539-1</dd>
<dd>【中图法分类号】F127.51</dd>
<dd>【原书定价】85.00</dd>
<dd>【参考文献格式】上海市国有资产监督管理委员会编著.守望相助 携手小康 上海市国资委系统精准扶贫案例集.上海:上海社会科学院出版社, 2021.04.</dd>
</dl>
"""
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 找到所有的<dd>标签
dd_tags = soup.find_all('dd')
# 创建一个空字典来存储提取的数据
data_dict = {}
# 遍历<dd>标签,提取数据
for dd in dd_tags:
# 假设每个<dd>标签都是以【开头和】结尾
content = dd.get_text()
key = content[1:content.find('】')].strip()
value = content[content.find('】') + 1:].strip()
data_dict[key] = value
# 使用pandas将字典写入Excel表格
df = pd.DataFrame([data_dict]) # 将字典转换为DataFrame
excel_path = 'data.xlsx' # Excel文件路径
sheet_name = 'Sheet2' # Excel中的sheet页名称
# 将DataFrame写入Excel
df.to_excel(excel_path, sheet_name=sheet_name, index=False)
print(f"数据已写入{excel_path}的{sheet_name}页")
def test11():
import pandas as pd
# 假设我们有两个DataFrame,每个DataFrame包含不同的数据
df1 = pd.DataFrame({
'作 者': ['上海市国有资产监督管理委员会编著'],
'形态项': ['324'],
# ... 其他键值对
})
df2 = pd.DataFrame({
'作 者': ['其他作者'],
'形态项': ['其他形态'],
# ... 其他键值对
})
# Excel文件路径
excel_path = 'data.xlsx'
# 使用ExcelWriter来写入不同的sheet页
with pd.ExcelWriter(excel_path) as writer:
# 将df1写入名为'Sheet1'的sheet页
df1.to_excel(writer, sheet_name='Sheet1', index=False)
# 将df2写入名为'Sheet2'的sheet页
df2.to_excel(writer, sheet_name='Sheet2', index=False)
print(f"数据已写入{excel_path}的不同sheet页")
def test12():
from bs4 import BeautifulSoup
# 假设html_content是包含上述HTML的字符串
html_content = '''
<div id="detail-info" class="book_na_bottom show">
<div class="book_item"><span class="book_val">所有责任者:</span><span class="book_type">李向荣著</span></div>
<div class="book_item"><span class="book_val">标识号:</span><span class="book_type">ISBN&nbsp;:&nbsp;978-7-203-11658-5&nbsp;</span></div>
<div class="book_item"><span class="book_val">出版、发行地:</span><span class="book_type">太原</span></div>
<div class="book_item"><span class="book_val">关键词:</span><span class="book_type">国有企业---混合所有制---企业改革---研究---中国</span></div>
<div class="book_item"><span class="book_val">语种:</span><span class="book_type">Chinese 汉语</span></div>
<div class="book_item"><span class="book_val">分类:</span><span class="book_type">中图分类&nbsp;:&nbsp;F279.241&nbsp;</span></div>
<div class="book_item"><span class="book_val">载体形态:</span><span class="book_type">210页</span></div>
</div>
'''
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 查找所有包含类名'book_item'的div标签
book_items = soup.find_all('div', class_='book_item')
# 创建一个空字典来存储提取的数据
book_info = {}
# 遍历每个book_item,提取键和值
for item in book_items:
key = item.find('span', class_='book_val').get_text().rstrip(':')
value = item.find('span', class_='book_type').get_text().strip()
book_info[key] = value
print(book_info)
def test13():
from bs4 import BeautifulSoup
# 假设html_content是包含上述HTML的字符串
html_content = '''
<div id="book_wr" class="book_wr"><div class="book_name">
国有企业混合所有制改革研究
</div><div class="book_item">
<span class="book_type">文献类型:</span>
<span style="margin-left: -8px;" class="book_val">专著</span>
</div><div class="book_item">
<span class="book_type">责任者:</span>
<a href="javascript:void(searchAuthor('李向荣'));" title="李向荣">
<span style="margin-left: -8px;" class="book_t_val">李向荣</span>
</a>
</div><div class="book_item">
<span class="book_type">出版、发行者:</span>
<a href="javascript:void(searchPublisher('山西人民出版社'));" title="山西人民出版社">
<span style="margin-left: -8px;" class="book_t_val">山西人民出版社</span>
</a>
</div><div class="book_item">
<span class="book_type">出版发行时间:</span>
<span style="margin-left: -8px;" class="book_val">2020</span>
</div><div class="book_item">
<span class="book_type">来源数据库:</span>
<span style="margin-left: -8px;" class="book_val">馆藏中文资源</span>
</div><div class="book_item">
<span class="book_val">分享到:</span>
<span>
<a href="javascript:void(share2sina('-5060698567676905396', '国有企业混合所有制改革研究', '馆藏中文资源', 'ucs01'));" class="" title="新浪微博"> <i class="xlwb"> </i></a>
</span>
</div>
'''
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(html_content, 'html.parser')
# 查找所有包含类名'book_item'的div标签
book_items = soup.find_all('div', class_='book_item')
# 创建一个空字典来存储提取的数据
book_info = {}
# 遍历每个book_item,提取键和值
for item in book_items:
try:
key = item.find('span', class_='book_type').get_text().rstrip(':')
value = item.find('span', class_='book_val').get_text().strip()
book_info[key] = value
except:
continue
# 第一个书名作为特殊处理,不包含冒号
book_info['书名'] = soup.find('div', class_='book_name').get_text().strip()
print(book_info)
def test14():
import re
# 假设这是你的HTML字符串
html_content = '''
<a href="javascript:void(0);" onclick="makeDetailUrl(this, '/search/showDocDetails?', '-49925015253155232', 'ucs01', '国资委');" target="_blank">
'''
# 使用正则表达式尝试提取参数
match = re.search(r"makeDetailUrl\(\s*this,\s*'(/search/showDocDetails?)([^']+)'", html_content)
if match:
print(match)
url_base = match.group(1)
params = match.group(2)
params_list = params.split(',')
doc_id = params_list[1].strip("'")
data_source = params_list[2].strip("'")
query = params_list[3].strip("'")
# 拼接URL
detail_url = url_base + f"docId={doc_id}&dataSource={data_source}&query={query}"
print(detail_url)
else:
print("无法提取参数")
def test15():
import pandas as pd
import xlsxwriter
# 创建一个示例 DataFrame
data = {
'Column1': ['Text with!@# illegal characters', 'Clean text123', 'More text^&* here'],
'Column2': ['Another text$% example', '1234', 'Text with() special characters']
}
df = pd.DataFrame(data)
# 创建一个新的 Excel 文件
excel_file = 'filtered_data.xlsx'
# 创建一个 Excel writer 对象
workbook = xlsxwriter.Workbook(excel_file)
worksheet = workbook.add_worksheet()
# 写入列名到 Excel 文件
for col_num, value in enumerate(df.columns):
worksheet.write(0, col_num, value)
# 写入 DataFrame 中的数据到 Excel 文件
for row_num, row_data in df.iterrows():
for col_num, value in enumerate(row_data):
worksheet.write(row_num + 1, col_num, value) # +1 是因为第一行被用于列名
# 保存并关闭 Excel 文件
workbook.close()
def test16():
import openpyxl
import redis
# 连接到Redis服务器
redis_client = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
list_info = ['IN-20240820-0031',
'IN-20240820-0032',
'IN-20240820-0033',
'IN-20240820-0034',
'IN-20240820-0035',
'IN-20240820-0036',
'IN-20240820-0037',
'IN-20240820-0038',
'IN-20240820-0039',
'IN-20240820-0040',
'IN-20240820-0041',
'IN-20240820-0042']
for i in list_info:
redis_client.lpush('weixin:zc', i)
def test17():
import re
text = '''{"上游": [原材料供应, 关键零部件生产, 软件开发],"中游": [智能装备制造, 生产线集成, 数据处理与分析],"下游": [产 品装配, 售后服务, 智能应用与解决方案]}
解释:
- 上游:涉及原材料和关键零部件的生产,以及为智能制造提供软件支持的开发活动。
- 中游:主要包括智能装备的制造、整个生产线的集成、以及生产过程中数据的处理与分析。
- 下游:关注产品的最终装配、提供售后服务,以及智能产品或解决方案的应用实施。'''
pattern = r'\{.*?\}' # 使用非贪婪模式匹配花括号内的内容
match = re.search(pattern, text, re.DOTALL) # 添加 re.DOTALL 使 . 匹配任何字符,包括换行符
print(f'match:{match}')
if match:
extracted_data = match.group(0)
print(extracted_data)
print(type(extracted_data))
results = json.loads(extracted_data)
print(results)
def test18():
import re
text = '''{"上游": [原材料供应, 关键零部件生产, 软件开发],"中游": [智能装备制造, 生产线集成, 数据处理与分析],"下游": [产品装配, 售后服务, 智能应用与解决方案]}'''
# 定义一个函数来提取指定部分的列表
def extract_list(text, part):
pattern = rf'"{part}": \[(.*?)\]'
match = re.search(pattern, text)
if match:
return match.group(1).strip().split(', ')
else:
return None
# 提取各部分的数据
upstream = extract_list(text, '上游')
midstream = extract_list(text, '中游')
downstream = extract_list(text, '下游')
print("上游:", upstream)
print("中游:", midstream)
print("下游:", downstream)
# # 尝试解析 JSON
# try:
# data = json.loads(formatted_text)
# print(data)
# except json.JSONDecodeError as e:
# print(f"JSON 解析错误: {e}")
def test19():
from selenium import webdriver
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
# opt.binary_location = r'F:\spider\Google\Chrome\Application\chrome.exe'
# chromedriver = r'F:\spider\cmd100\chromedriver.exe'
opt.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chromedriver = r'D:\cmd100\chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://www.12371.cn/2024/08/29/ARTI1724920928243624.shtml"
browser.get(url)
page_source = browser.page_source
# print(page_source)
news_soup = BeautifulSoup(page_source, 'html.parser')
# print(news_soup)
try:
title = news_soup.find('h1', class_='big_title').text
source = news_soup.find('div', class_='title_bottom').find('i').text
contentwithTag = news_soup.find('div', class_='word')
content = contentwithTag.text
except Exception as e:
return
dic_info = {
'title': title,
'origin': source,
'contentWithTag': str(contentwithTag),
'content': content,
'subjectId': '1681549361661489154',
'checkStatus': 1,
'deleteFlag': 0,
}
print(dic_info)
def test20():
import openpyxl
import redis
# 连接到Redis服务器
redis_client = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
# 打开Excel文件
workbook = openpyxl.load_workbook(r'D:\kkwork\企业数据\数据组提供\20240830_YJZX_胡润独角兽榜单_2019-2024_数据对比&股东信息v3(最大股东信息采集).xlsx')
# 选择要读取的工作表
worksheet = workbook['最大持股企业信息_采集']
# 选择要读取的列
column_index = 0 # 选择第2列
# 遍历指定列的单元格,并将值放入Redis列表
for row in worksheet.iter_rows(values_only=True):
try:
cell_value = row[30] + '|' + row[31]
except:
print(row[30])
continue
# cell_value = row[1]
redis_client.rpush('shareHolderInfo', cell_value)
# 关闭Excel文件
workbook.close()
def test21():
import re
def classify_report_type(title):
# 匹配以年份数字开头的年度财务报告
year_pattern = r'(.*?)\d{4}年?(年度财务报告|年报)'
if re.match(year_pattern, title):
return '年度报告'
# 匹配年年度报告
if "年年度报告" in title:
return "年度报告"
# 匹配半年度报告
if "半年度" in title or "半年报" in title or "半年财务报告" in title or "半年审计报告" in title:
return "半年度报告"
# 匹配财务报告
if "财务" in title or "财务状况报告" in title:
return "财务报告"
# 匹配审计报告
if "审计" in title or "审计报告" in title or "审计结果" in title:
return "审计报告"
# 如果没有匹配到,返回'未知报告'
return '未知报告'
# 测试标题
titles = [
"中国核工业集团公司2013年年报",
"中国核工业集团有限公司2020年度财务报告",
"2023年年度财务报告123",
"2023年年度财务报告-123",
]
print(classify_report_type(titles[0]))
def test22():
import re
# 标题
title = "中国核工业集团有限公司2024年半年度报告"
# 使用正则表达式匹配年份
year = re.search(r'\d{4}', title)
# 打印匹配到的年份
if year:
print(year.group())
else:
print("No year found in the title.")
def test23():
import re
# 标题
title = "中国核工业集团有限公司2024年度报告"
year_pattern = r'\d{4}年度财务报告'
if re.match(year_pattern, title):
return '年度财务报告'
if __name__ == "__main__": if __name__ == "__main__":
# import queue # import queue
# #
...@@ -549,4 +1007,4 @@ if __name__ == "__main__": ...@@ -549,4 +1007,4 @@ if __name__ == "__main__":
# test04() # test04()
# test05() # test05()
# test05() # test05()
test08() test21()
\ No newline at end of file
...@@ -7,16 +7,17 @@ import pymongo ...@@ -7,16 +7,17 @@ import pymongo
from bson import ObjectId from bson import ObjectId
from openpyxl import Workbook, load_workbook from openpyxl import Workbook, load_workbook
from base.BaseCore import BaseCore import sys
sys.path.append('../../base')
baseCore = BaseCore() import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = baseCore.cnx cnx = baseCore.cnx
cursor = baseCore.cursor cursor = baseCore.cursor
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[ db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息'] '天眼查登录信息']
db_storage2 = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[ db_storage2 = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'股东信息0621'] '最大股东信息0902']
class File(): class File():
...@@ -160,10 +161,12 @@ class Info(): ...@@ -160,10 +161,12 @@ class Info():
def update_holder(self, no, dic_info): def update_holder(self, no, dic_info):
db_storage2.update_one({'序号': str(no)}, {'$set': {'最大持股名称': dic_info['最大持股名称'], '持股比例': dic_info['持股比例'], '企业标签': dic_info['企业标签']}}) db_storage2.update_one({'序号': str(no)}, {'$set': {'最大持股名称': dic_info['最大持股名称'], '持股比例': dic_info['持股比例'], '企业标签': dic_info['企业标签']}})
pass pass
def update_info(self, no, dic_info): def update_info(self, no, dic_info):
db_storage2.update_one({'序号': str(no)}, { db_storage2.update_one({'序号': str(no)}, {
'$set': {'股东企业信用代码': dic_info['股东企业信用代码'], '股东企业标签': dic_info['股东企业标签']}}) '$set': {'股东企业信用代码': dic_info['股东企业信用代码'], '股东企业标签': dic_info['股东企业标签']}})
pass pass
def insert_into(self, dic_info): def insert_into(self, dic_info):
if dic_info['股东序号序号']: if dic_info['股东序号序号']:
...@@ -179,6 +182,16 @@ class Info(): ...@@ -179,6 +182,16 @@ class Info():
print(result) print(result)
pass pass
def bigshearholder_insert(self,dic_info):
insertion_result = db_storage2.insert_one(dic_info)
inserted_id = insertion_result.inserted_id
return inserted_id
def bigupdate_info(self, no, dic_info):
db_storage2.update_one({'企业信用代码(中国内地企业需填写信用代码)': str(no)}, {
'$set': {'最大持股企业信用代码': dic_info['最大持股企业信用代码'], '最大持股企业标签': dic_info['最大持股企业标签']}})
pass
from selenium import webdriver from selenium import webdriver
class Driver(): class Driver():
......
...@@ -26,7 +26,7 @@ if __name__ == "__main__": ...@@ -26,7 +26,7 @@ if __name__ == "__main__":
name = input('所属用户:') name = input('所属用户:')
driver = create_driver() driver = create_driver()
driver.get(url) driver.get(url)
time.sleep(60) time.sleep(80)
cookies = driver.get_cookies() cookies = driver.get_cookies()
# print(driver.get_cookies()) # print(driver.get_cookies())
......
"""采集最大股东信息"""
import json
import requests, time
from bs4 import BeautifulSoup
import urllib3
from retry import retry
from getTycId import getTycIdByXYDM
import sys
sys.path.append('../../base')
import BaseCore
baseCore = BaseCore.BaseCore()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
cnx = baseCore.cnx_
cursor = baseCore.cursor_
list_all_1 = []
list_all_2 = []
taskType = '天眼查/股东信息'
from classtool import Token, Info
token = Token()
Info = Info()
@retry(tries=3, delay=1)
def get_html(tycid, driver, dic_info):
url = f"https://www.tianyancha.com/company/{tycid}"
driver.get(url=url)
time.sleep(3)
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
script = soup.find('script', attrs={'id': '__NEXT_DATA__'}).text
script = json.loads(script)
script = script['props']['pageProps']['dehydratedState']['queries'][0]['state']['data']['data']['tagListV2']
tag_list = []
filter_list = ['存续', '曾用名', '竞争风险', '司法案件', '合作风险', '股权出质', '仍注册']
for tag in script:
if tag['title'] in filter_list:
continue
if tag['color'] == '#FF463C':
continue
tag_list.append(tag['title'])
dic_info['企业标签'] = tag_list
try:
div_part = soup.find('div', attrs={'data-dim': 'holder'})
# div_part.find('div', class_='dimHeader_root__XTCLe')
except:
return -1, dic_info, -1
if div_part is None:
return -2, dic_info, -2
else:
try:
tmp_field = div_part.find('h3', class_='dimHeader_main-title-txt__GPoaZ').text
if '股东信息' in tmp_field:
total = 0
log.info('股东信息')
try:
total = div_part.find('div', class_='dim-tab-root').find('span').get_text().split('股东信息')[1].replace(
' ', '')
except:
try:
total = div_part.find('div', class_='dim-tab-root').find('span').get_text().split('最新公示')[
1].replace(
' ', '')
except:
total = div_part.find('span', class_="dimHeader_main-title-cnt__zzW8k").text
return int(total), dic_info, 1
else: # 否则就是主要股东接口
if '主要股东' in tmp_field:
log.info('主要股东')
total = div_part.find('div', class_='dim-tab-root').find('span').get_text().split('最新公示')[1].replace(' ', '')
return int(total), dic_info, 2
except Exception as e:
return 0, dic_info
@retry(tries=5, delay=3)
def get_page(url, s, headers):
ip = baseCore.get_proxy()
res = s.get(url=url, headers=headers, proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
data_page = res.json()
# log.info(f'接口获取总数---{data_page}')
try:
total_page_ = data_page['data']['total']
except:
raise
return total_page_, data_page
@retry(tries=5, delay=3)
def get_page1(url, s, headers):
ip = baseCore.get_proxy()
res = s.get(url=url, headers=headers, proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
data_page = res.json()
# log.info(f'接口获取总数---{data_page}')
try:
total_page_ = data_page['data']['stockHolder']['total']
except:
raise
return total_page_, data_page
@retry(tries=5, delay=3)
def post_page(url, s, headers, payload):
ip = baseCore.get_proxy()
res = s.post(url=url, headers=headers, data=json.dumps(payload), proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
json_info = res.json()
try:
total_page_ = json_info['data']['total']
except:
raise
return total_page_, json_info
from selenium import webdriver
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
def login(driver):
cookies = {}
cookies_list, id_cookie, user_name = token.get_cookies()
if cookies_list:
pass
else:
log.info("没有账号了,等待30分钟")
time.sleep(30 * 60)
return '', '', ''
log.info(f'=====当前使用的是{user_name}的cookie======')
for cookie in cookies_list:
driver.add_cookie(cookie)
time.sleep(3)
driver.refresh()
time.sleep(3)
for cookie in cookies_list:
cookies[cookie['name']] = cookie['value']
s = requests.Session()
s.cookies.update(cookies)
return driver, id_cookie, s
def doJob():
# for social_code in social_code_list:
driver = create_driver()
url = 'https://www.tianyancha.com/'
driver.get(url)
driver.maximize_window()
for i in range(1000):
# while True:
# todo:设置cookies的使用
headers = {
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'version': 'TYC-Web'
}
driver, id_cookie, s = login(driver)
if id_cookie:
pass
else:
continue
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
item = baseCore.redicPullData('shareHolderInfo')
# item = '9133010009205585XF|杭州大搜车汽车服务有限公司'
dic_info = {}
# 判断 如果Redis中已经没有数据,则等待
# social_code = '91110108780992804C'
if item == None:
time.sleep(30 * 60)
continue
start = time.time()
# no = item.split('|')[0]
# social_code = item.split('|')[11]
social_code = item.split('|')[0]
com_name = item.split('|')[1]
# recept_name = item.split('|')[12]
dic_info["企业信用代码(中国内地企业需填写信用代码)"] = social_code
dic_info["企业名称(企查查/天眼查)"] = com_name
"""
最大持股企业、最大持股企业原文名称、最大持股企业所属国家、持股比例、最大持股企业信用代码、最大持股企业标签
"""
if "ZZSN" in social_code:
dic_info['持股比例'] = ''
dic_info['最大股东名称'] = ''
Info.bigshearholder_insert(dic_info)
break
try:
try:
data = baseCore.getInfomation(social_code)
except:
data = []
if len(data) != 0:
id = data[0]
com_name = data[1]
xydm = data[2]
tycid = data[11]
count = data[17]
else:
# 数据重新塞入redis
# log.info(f'数据库中无该企业{social_code}')
sql = f"SELECT * FROM sys_base_enterprise WHERE social_credit_code = '{social_code}'"
cursor.execute(sql)
data = cursor.fetchone()
if data:
com_name = data[3]
xydm = data[1]
# 写入数据库
insert = "INSERT INTO EnterpriseInfo(CompanyName, SocialCode) VALUES (%s, %s)"
cursor_.execute(insert, (com_name, xydm))
cnx_.commit()
tycid = ''
else:
# 数据库中并没有该企业 需要新增
xydm = social_code
tycid = ''
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(com_name, s) #{'state': True, 'tycData': {'id': 3406898015, 'graphId': '3406898015', 'type': 0, 'matchType': '公司名称匹配', 'comName': '上海商汤科技开发有限公司', 'name': '<em>上海商汤科技开发有限公司</em>', 'alias': '商汤', 'logo': 'https://img5.tianyancha.com/null@!f_200x200', 'claimLevel': None, 'regStatus': 0, 'taxCode': '91310115MA1HB3LY4M'}, 'reput': True}
# retData = getTycIdByXYDM("极星汽车销售有限公司", s)
if retData['state']:
tycid = retData['tycData']['id']
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('shareHorder:Error', item)
continue
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('shareHorder:Error', item)
continue
log.info(f"---{xydm}----{tycid}----开始采集股东信息")
try:
charge, dic_info, num = get_html(tycid, driver, dic_info)
# 页面请求三次都失败
except:
charge = -1
t = int(time.time() * 1000)
if charge == -1:
token.updateTokeen(id_cookie, 3)
# 重新塞入redis
baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f"---{xydm}----{tycid}----请求失败----重新放入redis")
time.sleep(3)
continue
elif charge == -2:
# 该企业没有股东信息
# token.updateTokeen(id_cookie, 2)
# baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f"--{xydm}----{tycid}----没有股东信息或需要滑动验证----重新放入redis")
time.sleep(5)
Info.insert_into(dic_info)
continue
else:
log.info(f"---{xydm}----{tycid}")
url2 = f'https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holderV2'
url3 = f'https://capi.tianyancha.com/cloud-listed-company/listed/holder/topTen?&gid={tycid}&pageSize=20&pageNum=1&percentLevel=-100&type=1'
url1 = f'https://capi.tianyancha.com/cloud-listed-company/listed/holder/hk?date=&gid={tycid}&sortField=&sortType=-100&pageSize=10&pageNum=1&percentLevel=-100&keyword='
payload = {"gid": f"{tycid}", "pageSize": 10, "pageNum": 1, "sortField": "", "sortType": "-100", "historyType": 1}
try:
total_page2, data_page2 = post_page(url2, s, headers, payload)
except Exception as e:
log.info(e)
total_page2 = 0
data_page2 = {}
time.sleep(1)
try:
total_page3, data_page3 = get_page(url3, s, headers)
except:
total_page3 = 0
data_page3 = {}
try:
total_page1, data_page1 = get_page1(url1, s, headers)
except:
total_page1 = 0
data_page1 = {}
if total_page2 == charge:
url = 'https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holderV2?'
total_page = total_page2
data_page_one = data_page2
flag = 1
else:
if total_page3 == charge:
url = 'https://capi.tianyancha.com/cloud-listed-company/listed/holder/topTen?&gid={}&pageSize=20&pageNum={}&percentLevel=-100&type=1'
total_page = total_page3
data_page_one = data_page3
flag = 3
else:
total_page = total_page1
data_page_one = data_page1
flag = 0
if total_page == 0:
# token.updateTokeen(id_cookie, 2)
# 重新塞入redis
inserted_id = Info.bigshearholder_insert(dic_info)
# baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f'==={social_code}=====总数请求失败===重新放入redis====')
continue
# todo:获取页数
log.info(f'总数为{total_page}')
# if int(total_page % 20) == 0:
# maxpage = int((total_page / 20) + 1)
# else:
# maxpage = int((total_page / 20) + 1) + 1
for page in range(1, 2):
if page == 1:
data_page = data_page_one
errorCode = data_page['errorCode']
else:
break
if errorCode == 0:
pass
else:
# token.updateTokeen(id_cookie, 2)
# 重新塞入redis
# baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f'---{xydm}----{tycid}--{data_page}--股东信息请求失败')
continue
# todo:test测试
log.info(f'--{xydm}----{tycid}---')
try:
list_all = data_page['data']['holderList']
except:
try:
list_all = data_page['data']['result']
except:
list_all = data_page['data']['stockHolder']['result']
if list_all:
pass
else:
log.info(f'---{xydm}----{tycid}----没有股东信息')
# todo: 关闭连接
# res.close()
log.info(f'----flag:{flag}----')
log.info(f'-----list_all:{len(list_all)}----')
for idx,holder_info in enumerate(list_all):
shareHolderName, percent = '', ''
if flag == 1:
shareHolderName = holder_info['shareHolderName']
percent = holder_info['percent']
capitalTotal = holder_info['capitalTotal']
try:
big_tycid = holder_info['shareHolderGid']
enterprise = holder_info['shareHolderTypeOnPage']
except:
big_tycid = ""
enterprise = ""
elif flag == 3:
shareHolderName = holder_info['name']
percent = holder_info['proportion']
capitalTotal = ''
try:
big_tycid = holder_info['shareHolderGid']
enterprise = holder_info['shareHolderTypeOnPage']
except:
big_tycid = ""
enterprise = ""
else:
shareHolderName = holder_info['holder_name']
percent = holder_info['longHeldRatioWithUnit']
capitalTotal = ''
try:
big_tycid = holder_info['holder_gid']
enterprise = holder_info['type']
if enterprise == 1:
enterprise = '企业法人'
except:
big_tycid = ""
enterprise = ""
if shareHolderName and percent:
dic_info['最大股东名称'] = shareHolderName
dic_info['持股比例'] = percent
if big_tycid:
dic_info['股东名称tycid'] = big_tycid
# todo: 插入一条新纪录
log.info(dic_info)
try:
del dic_info['_id']
except:
pass
inserted_id = Info.bigshearholder_insert(dic_info)
# 推送到redis采集其他信息
if enterprise == '企业法人' or enterprise == "企业股东":
Big_item = xydm + "|" + shareHolderName + "|" + str(big_tycid)
baseCore.r.rpush('BigShareHolder:comname', Big_item)
else:
Big_item = xydm + "|" + shareHolderName
baseCore.r.rpush('BigShareHolder:person', Big_item)
log.info('=========成功======')
break
token.updateTokeen(id_cookie, 3)
# time.sleep(randint(5,10))
time.sleep(5)
except Exception as e:
# 4月28日采集失败不更新封号时间,更新使用时间
token.updateTokeen(id_cookie, 3)
# token.updateTokeen(id_cookie, 2)
log.info(f'==={social_code}=====企业核心人员采集失败===重新放入redis====')
log.info(e)
# 重新塞入redis
baseCore.rePutIntoR('shareHolderInfo', item)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
# break
# df_img = pd.DataFrame(list_all_2)
# df_img.to_excel('企业主要人员-头像.xlsx',index=False)
if __name__ == "__main__":
doJob()
\ No newline at end of file
...@@ -180,32 +180,26 @@ def doJob(): ...@@ -180,32 +180,26 @@ def doJob():
continue continue
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
item = baseCore.redicPullData('shareHolderInfo') item = baseCore.redicPullData('shareHolderInfo')
# item = '900|微创心律管理|None|罗七一|健康科技|¥ 90 亿|¥ 90 亿|¥ 92 亿|823|861|911|ZZSN231108150127681|MicroPort Cardiac Rhythm Management International Limited|中国|None' # item = '91310115MA1HB3LY4M|上海商汤科技开发有限公司'
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
# social_code = '91110108780992804C' # social_code = '91110108780992804C'
if item == None: if item == None:
time.sleep(30 * 60) time.sleep(30 * 60)
continue continue
start = time.time() start = time.time()
no = item.split('|')[0] # no = item.split('|')[0]
social_code = item.split('|')[11] # social_code = item.split('|')[11]
social_code = item.split('|')[0]
com_name = item.split('|')[1]
recept_name = item.split('|')[12] # recept_name = item.split('|')[12]
dic_info = {"序号": item.split('|')[0], dic_info = {"序号": item.split('|')[0],
"企业名称(榜单公布)": item.split('|')[1], "企业信用代码(中国内地企业需填写信用代码)": social_code,
"企业别称": item.split('|')[2], "企业名称(企查查/天眼查)": com_name
"门人/联合创始": item.split('|')[3],
"行业": item.split('|')[4],
"企业估值(2022年)": item.split('|')[5],
"企业估值(2023年)": item.split('|')[6],
"企业估值(2024年)": item.split('|')[7],
"2022年独角兽排名": item.split('|')[8],
"2023年独角兽排名": item.split('|')[9],
"2024年独角兽排名": item.split('|')[10],
"企业信用代码(中国内地企业需填写信用代码)": item.split('|')[11],
"企业名称(企查查)": item.split('|')[12],
"所属国家": item.split('|')[13]
} }
"""
最大持股企业、最大持股企业原文名称、最大持股企业所属国家、持股比例、最大持股企业信用代码、最大持股企业标签
"""
if "ZZSN" in social_code: if "ZZSN" in social_code:
dic_info['前十大股东名称'] = '' dic_info['前十大股东名称'] = ''
dic_info['持股比例'] = '' dic_info['持股比例'] = ''
...@@ -244,7 +238,7 @@ def doJob(): ...@@ -244,7 +238,7 @@ def doJob():
tycid = '' tycid = ''
if tycid == None or tycid == '': if tycid == None or tycid == '':
try: try:
retData = getTycIdByXYDM(recept_name, s) retData = getTycIdByXYDM(com_name, s)
# retData = getTycIdByXYDM("极星汽车销售有限公司", s) # retData = getTycIdByXYDM("极星汽车销售有限公司", s)
if retData['state']: if retData['state']:
tycid = retData['tycData']['id'] tycid = retData['tycData']['id']
......
"""采集最大股东信息的相关信息"""
import json
import requests, time
from bs4 import BeautifulSoup
import urllib3
from retry import retry
from getTycId import getTycIdByXYDM
import sys
sys.path.append('../../base')
import BaseCore
baseCore = BaseCore.BaseCore()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
cnx = baseCore.cnx_
cursor = baseCore.cursor_
list_all_1 = []
list_all_2 = []
taskType = '天眼查/股东信息'
from classtool import Token, Info
token = Token()
Info = Info()
@retry(tries=3, delay=1)
def get_html(tycid, driver, dic_info):
url = f"https://www.tianyancha.com/company/{tycid}"
driver.get(url=url)
time.sleep(3)
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
xydm = soup.find('span', attrs={'class': 'index_detail-credit-code__fH1Ny'}).text
dic_info['最大持股企业信用代码'] = xydm
script = soup.find('script', attrs={'id': '__NEXT_DATA__'}).text
script = json.loads(script)
script = script['props']['pageProps']['dehydratedState']['queries'][0]['state']['data']['data']['tagListV2']
tag_list = []
filter_list = ['存续', '曾用名', '竞争风险', '司法案件', '合作风险', '股权出质', '仍注册']
for tag in script:
if tag['title'] in filter_list:
continue
if tag['color'] == '#FF463C':
continue
tag_list.append(tag['title'])
dic_info['最大持股企业标签'] = tag_list
return dic_info
@retry(tries=5, delay=3)
def get_page(url, s, headers):
ip = baseCore.get_proxy()
res = s.get(url=url, headers=headers, proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
data_page = res.json()
# log.info(f'接口获取总数---{data_page}')
try:
total_page_ = data_page['data']['total']
except:
raise
return total_page_, data_page
@retry(tries=5, delay=3)
def get_page1(url, s, headers):
ip = baseCore.get_proxy()
res = s.get(url=url, headers=headers, proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
data_page = res.json()
# log.info(f'接口获取总数---{data_page}')
try:
total_page_ = data_page['data']['stockHolder']['total']
except:
raise
return total_page_, data_page
@retry(tries=5, delay=3)
def post_page(url, s, headers, payload):
ip = baseCore.get_proxy()
res = s.post(url=url, headers=headers, data=json.dumps(payload), proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
json_info = res.json()
try:
total_page_ = json_info['data']['total']
except:
raise
return total_page_, json_info
from selenium import webdriver
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
def login(driver):
cookies = {}
cookies_list, id_cookie, user_name = token.get_cookies()
if cookies_list:
pass
else:
log.info("没有账号了,等待30分钟")
time.sleep(30 * 60)
return '', '', ''
log.info(f'=====当前使用的是{user_name}的cookie======')
for cookie in cookies_list:
driver.add_cookie(cookie)
time.sleep(3)
driver.refresh()
time.sleep(3)
for cookie in cookies_list:
cookies[cookie['name']] = cookie['value']
s = requests.Session()
s.cookies.update(cookies)
return driver, id_cookie, s
def doJob():
# for social_code in social_code_list:
driver = create_driver()
url = 'https://www.tianyancha.com/'
driver.get(url)
driver.maximize_window()
for i in range(1000):
# while True:
# todo:设置cookies的使用
headers = {
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'version': 'TYC-Web'
}
driver, id_cookie, s = login(driver)
if id_cookie:
pass
else:
continue
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
item = baseCore.redicPullData('BigShareHolder:comname')
dic_info = {}
# item = '91310115MA1HB3LY4M|上海阡伦科技有限公司|3476165132'
# 判断 如果Redis中已经没有数据,则等待
# Big_item = inserted_id + "|" + shareHolderName + "|" + big_tycid
if item == None:
time.sleep(30 * 60)
continue
start = time.time()
no = item.split('|')[0]
# todo:根据信用代码找到该条信息
tycid = item.split('|')[2]
com_name = item.split('|')[1]
try:
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(com_name, s)
# retData = getTycIdByXYDM("极星汽车销售有限公司", s)
if retData['state']:
tycid = retData['tycData']['id']
xydm = retData['tycData']['taxCode']
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(com_name, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={com_name}====重新放入redis====')
baseCore.rePutIntoR('BigShareHolder:Error', item)
continue
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(com_name, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('BigShareHolder:Error', item)
continue
log.info(f"---{com_name}----{tycid}----开始采集股东信息")
try:
dic_info = get_html(tycid, driver, dic_info)
charge = 0
# 页面请求三次都失败
except:
charge = -1
if charge == -1:
token.updateTokeen(id_cookie, 3)
# 重新塞入redis
baseCore.rePutIntoR('BigShareHolder:comname', item)
log.info(f"---{com_name}----{tycid}----请求失败----重新放入redis")
time.sleep(3)
continue
else:
t = int(time.time() * 1000)
Info.bigupdate_info(no, dic_info)
except Exception as e:
token.updateTokeen(id_cookie, 3)
# token.updateTokeen(id_cookie, 2)
log.info(f'==={com_name}=====企业核心人员采集失败===重新放入redis====')
log.info(e)
# 重新塞入redis
baseCore.rePutIntoR('BigShareHolder:comname', item)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(com_name, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
# break
# df_img = pd.DataFrame(list_all_2)
# df_img.to_excel('企业主要人员-头像.xlsx',index=False)
if __name__ == "__main__":
doJob()
\ No newline at end of file
...@@ -74,6 +74,7 @@ if __name__ == "__main__": ...@@ -74,6 +74,7 @@ if __name__ == "__main__":
# loadinfo = [token,cookies] # loadinfo = [token,cookies]
# 保存到数据库中 # 保存到数据库中
# insert = f"insert into weixin_tokenCookies_person (token,cookies,create_time,fenghao_time,user_name,update_time) values ('{token}','{escape_string(cookies)}',now(),DATE_SUB(NOW(), INTERVAL 1 DAY),'{user_name}',now())"
insert = f"insert into weixin_tokenCookies (token,cookies,create_time,fenghao_time,user_name,update_time) values ('{token}','{escape_string(cookies)}',now(),DATE_SUB(NOW(), INTERVAL 1 DAY),'{user_name}',now())" insert = f"insert into weixin_tokenCookies (token,cookies,create_time,fenghao_time,user_name,update_time) values ('{token}','{escape_string(cookies)}',now(),DATE_SUB(NOW(), INTERVAL 1 DAY),'{user_name}',now())"
cursor_.execute(insert) cursor_.execute(insert)
cnx_.commit() cnx_.commit()
......
# -*- coding: utf-8 -*-
'''
成功100 发送数据失败200 请求失败400 文章内容为空500 处理style标签失败700 发布内容不存在800 图片处理失败300、600
'''
import re
import requests, time, random, json, pymysql, redis
import urllib3
from bs4 import BeautifulSoup
from obs import ObsClient
from kafka import KafkaProducer
from retry import retry
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
# cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
# cursor = cnx.cursor()
r = baseCore.r
urllib3.disable_warnings()
def rePutIntoR(item):
r.rpush('WeiXinGZH:linkid', item)
def updatewxLink(link,info_source_code,state):
updateSuccess = f"update wx_link set state= {state} where link='{link}' and info_source_code='{info_source_code}' "
cursor_.execute(updateSuccess)
cnx_.commit()
def getjsonInfo():
# todo:从redis中获取一条
# linkid = baseCore.redicPullData('WeiXinGZH:linkid')
# 测试使用
linkid = True
# if linkid:
# pass
# else:
# log.info('-----没有数据了-----')
# return False, False
# #从数据库中获取信息 一条
# select_sql = f"select * from wx_link where state=0 and id= '{linkid}'"
# cursor_.execute(select_sql)
# row = cursor_.fetchone()
# cnx_.commit()
# if row:
# pass
# else:
# log.info('-----没有数据了-----')
# return False, False
# dict_json = {
# 'sid':row[1],
# 'site_uri':row[2],
# 'site_name':row[3],
# 'info_source_code':row[4],
# 'title':row[5],
# 'publish_time':row[6],
# 'link':row[7]
# }
dict_json = {
'sid': 111,
'site_uri': "",
'site_name': "",
'info_source_code': "",
'title': "测试",
'publish_time': "",
'link': "https://mp.weixin.qq.com/s?__biz=MjM5MDIxNjczNA==&mid=2652863674&idx=2&sn=e6c37cdffb9eaeefc652df275bd29381&chksm=bda3fbcb8ad472ddb6609d7a9b34091c1ea1c5cf009b1a6734e9ee883960279ed6763e574050#rd"
}
# # 拿到一条数据 更新状态
# update_sql = f"update wx_link set state=1 where link='{row[7]}' and info_source_code='{row[4]}' "
# cursor_.execute(update_sql)
# cnx_.commit()
return dict_json, linkid
@retry(tries=20, delay=2)
def getrequest(url_news):
# ip = baseCore.get_proxy()
# res_news = requests.get(url_news, proxies=ip, timeout=20)
res_news = requests.get(url_news, timeout=20)
log.info(res_news.status_code)
if res_news.status_code != 200:
raise
return res_news
def get_info(dict_json, linkid):
# list_all_info = []
# num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
news_title = dict_json['title']
sid = dict_json['sid']
news_date = dict_json['publish_time']
origin = dict_json['site_name']
url_news = dict_json['link']
info_source_code = dict_json['info_source_code']
# while True:
# try:
# ip = baseCore.get_proxy()
# res_news = requests.get(url_news, proxies=ip, timeout=20)
# break
# except:
# time.sleep(3)
#400请求失败
# updatewxLink(url_news, info_source_code, 400)
# return False
# 修改请求方法,retry 3次
try:
res_news = getrequest(url_news)
# print(res_news)
except:
# 修改回原状态,重新放入redis
# updatewxLink(url_news, info_source_code, 0)
log.info(f'{origin}---{news_date}--{news_title}---请求失败-- 重新放入redis')
# baseCore.rePutIntoR('WeiXinGZH:linkid', linkid)
# try:
# res_news = requests.get(url_news, timeout=20)
# except:
# # 400请求失败
# updatewxLink(url_news, info_source_code, 400)
return False
soup_news = BeautifulSoup(res_news.content, 'html.parser')
if '此内容发送失败无法查看' in soup_news.text or '该页面不存在' in soup_news.text or '该内容已被发布者删除' in soup_news.text or '此内容因违规无法查看' in soup_news.text:
log.info(f'--errorCode:800--{origin}---{news_date}---{news_title}----内容无法查看')
# updatewxLink(url_news, info_source_code, 800)
return False
try:
news_html = soup_news.find('div', {'id': 'js_content'})
news_html['style'] = 'width: 814px ; margin: 0 auto;'
#del news_html['style']
news_html = rm_style_attr(news_html)
del news_html['id']
del news_html['class']
except Exception as e:
log.info(f'--errorCode:700--{url_news}-----------{e}')
# log.error(f'{url_news}-----{info_source_code}')
# updatewxLink(url_news, info_source_code, 0)
log.info(f'{origin}---{news_date}--{news_title}---style标签解析失败---重新放入redis')
# baseCore.rePutIntoR('WeiXinGZH:linkid', linkid)
return False
try:
news_content = news_html.text
except:
log.info(f'--------内容为空--------{url_news}--------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
news_title,
url_news,
news_html,
'文章内容为空',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
# updatewxLink(url_news, info_source_code, 500)
return False
list_img = news_html.find_all('img')
for num_img in range(len(list_img)):
img_one = list_img[num_img]
url_src = img_one.get('data-src')
# print(url_src)
if 'gif' in url_src:
url_img = ''
img_one.extract()
else:
try:
try:
name_img = url_src.split('/')[-2] + '.' + url_src.split('wx_fmt=')[1]
except:
img_one.extract()
continue
try:
res = requests.get(url_src, timeout=20)
except:
img_one.extract()
continue
resp = None
for i in range(10):
try:
resp = obsClient.putContent('zzsn', name_img, content=res.content)
break
except:
time.sleep(2)
if resp:
pass
else:
img_one.extract()
continue
try:
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
except Exception as e:
log.info(f'--errorCode:300--{url_news}-----------{e}')
# updatewxLink(url_news, info_source_code, 300)
return False
try:
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
except Exception as e:
log.info(f'--errorCode:300--{url_news}-----------{e}')
# updatewxLink(url_news, info_source_code, 300)
return False
except Exception as e:
log.info(f'--errorCode:600--{url_news}-----------{e}')
# updatewxLink(url_news, info_source_code, 600)
return False
for tag in news_html.descendants:
try:
del tag['style']
except:
pass
list_section = news_html.find_all('section')
for section in list_section:
section.name = 'div'
news_html = deletek(news_html)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info = {
'sid': sid,
'title': news_title,
'content': news_content,
'contentWithtag': str(news_html),
'summary': '',
'author': '',
'origin': origin,
'publishDate': news_date,
'sourceAddress': url_news,
'source': '11',
'createDate': time_now
}
log.info(dic_info)
# for nnn in range(0, 3):
# try:
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
# kafka_result = producer.send("crawlerInfo_test", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
# kafka_time_out = kafka_result.get(timeout=10)
# # add_url(sid, url_news)
# break
# except:
# time.sleep(5)
# log.info('------数据发送kafka失败------')
# updatewxLink(url_news, info_source_code, 200)
# continue
#
# list_all_info.append(dic_info)
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# dic_info2 = {
# 'infoSourceId': sid,
# 'code': info_source_code,
# 'num': num_caiji,
# 'collectTime': kaishi_time,
# 'dispatcherTime': time_now,
# 'dispatcherStatus': '1',
# 'source': '1',
# }
# for nnn2 in range(0, 3):
# try:
# producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2,7,0))
# kafka_result2 = producer2.send("collectionAndDispatcherInfo",
# json.dumps(dic_info2, ensure_ascii=False).encode('utf8'))
# break
# except:
# time.sleep(5)
# continue
# updatewxLink(url_news, info_source_code, 100)
return True
def rm_style_attr(soup):
# 查找所有含有style属性的标签
style_tags = soup.find_all(style=True)
# 遍历每个style标签
for style_tag in style_tags:
try:
# 使用正则表达式替换
styleattr = style_tag['style']
styleattr = re.sub(r'visibility:(?s).{1,}?;', '', styleattr)
styleattr = re.sub(r'font-family:(?s).{1,}?;', '', styleattr)
styleattr = re.sub(r'color:(?s).{1,}?;', '', styleattr)
styleattr = re.sub(r'font-size:(?s).{1,}?;', '', styleattr)
style_tag['style'] = styleattr
except:
continue
# first_div = soup.select('div[id="js_content"]')
# # 设置style属性
# first_div['style'] = 'width: 814px ; margin: 0 auto;'
first_div = soup.select('div[id="js_content"]')
if first_div:
first_div = first_div[0] # 获取第一个匹配的元素
first_div['style'] = 'width: 814px ; margin: 0 auto;' # 设置style属性
return soup
# def deletek(soup):
# # 删除空白标签(例如<p></p>、<p><br></p>, img、video、br除外)
# for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video", "br"] and tag.name != "br" or tag.get_text()==' '):
# for j in i.descendants:
# if j.name in ["img", "video", "br"]:
# break
# else:
# i.decompose()
#
# return soup
def deletek(soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、br除外)
for i in soup.find_all(lambda tag: len(tag.get_text(strip=True)) == 0 and tag.name not in ["img", "video"]):
if i.name == "p":
# 检查 <p> 标签内是否只包含 <br>
only_br = True
for child in i.children:
if child.name and child.name != "br":
only_br = False
break
if only_br:
i.decompose()
else:
# 检查标签是否包含 img 或 video 子标签
contains_img_or_video = False
for child in i.descendants:
if child.name in ["img", "video"]:
contains_img_or_video = True
break
if not contains_img_or_video:
i.decompose()
return soup
if __name__=="__main__":
num_caiji = 0
list_all_info = []
while True:
#一次拿取一篇文章
# todo: 从redis拿数据 更新mysql状态
dict_json, linkid =getjsonInfo()
try:
if dict_json:
if get_info(dict_json, linkid):
num_caiji = num_caiji + 1
log.info(f'-----已采集{num_caiji}篇文章---来源{dict_json["site_name"]}----')
else:
continue
except:
rePutIntoR(linkid)
baseCore.close()
\ No newline at end of file
import pandas as pd import pandas as pd
...@@ -2,20 +2,20 @@ import pandas as pd ...@@ -2,20 +2,20 @@ import pandas as pd
import pymongo import pymongo
# 7649 # 7649
data_list = [] data_list = []
db_stroage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN['新华丝路-丝路商机100+'] db_stroage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='shencai', password='shencai_zzsn008').ZZSN['国务院问答对']
# datas = db_stroage.find({"内容": {"$ne": None, "$exists": True}}) # datas = db_stroage.find({"内容": {"$ne": None, "$exists": True}})
# 导出标签是空的数据 # 导出标签是空的数据
datas = db_stroage.find() datas = db_stroage.find()
link = [] link = []
for data in datas: for data in datas:
del data['_id'] del data['_id']
del data['id'] # del data['id']
# if data['标题'] not in link: if data['问题']:
# data_list.append(data)
# link.append(data['标题'])
data_list.append(data) data_list.append(data)
else:
continue
# print(data) # print(data)
print(len(data_list)) print(len(data_list))
df = pd.DataFrame(data_list) df = pd.DataFrame(data_list)
df.to_excel('./新华丝路-丝路投资2.xlsx',index=False) df.to_excel('./国务院问答对.xlsx',index=False)
\ No newline at end of file \ No newline at end of file
# 读取表中的数据,转化成list # 读取表中的数据,转化成list
...@@ -44,7 +44,8 @@ def getrequest(href, headers): ...@@ -44,7 +44,8 @@ def getrequest(href, headers):
def classify_report_type(title): def classify_report_type(title):
if "年年度报告" in title or re.match(r'\d{4}年度报告', title): type_pattern = r'(.*?)\d{4}年?(年度财务报告|年报|年度报告)'
if "年年度报告" in title or re.match(type_pattern, title):
return "年度报告" return "年度报告"
elif "半年" in title: elif "半年" in title:
return "半年度报告" return "半年度报告"
...@@ -95,15 +96,16 @@ def parase(com_name, social_code, dataJson): ...@@ -95,15 +96,16 @@ def parase(com_name, social_code, dataJson):
"报告年份": year "报告年份": year
} }
db_storage2.insert_one(dic_info) db_storage2.insert_one(dic_info)
time.sleep(1) time.sleep(2)
if __name__ == "__main__": if __name__ == "__main__":
dataList = getcomlist(file_path, sheet_name) dataList = getcomlist(file_path, sheet_name)
# print(dataList) # print(dataList)
for item in enumerate(dataList): for item in enumerate(dataList):
social_code = item[1] # print(item)
com_name = item[2] social_code = item[1][1]
com_name = item[1][2]
print(f"正在采集:{com_name}") print(f"正在采集:{com_name}")
href = url.format(com_name, 1) href = url.format(com_name, 1)
dataJson = getrequest(href, headers) dataJson = getrequest(href, headers)
...@@ -116,5 +118,5 @@ if __name__ == "__main__": ...@@ -116,5 +118,5 @@ if __name__ == "__main__":
href_page = url.format(com_name, page) href_page = url.format(com_name, page)
dataJson_page = getrequest(href_page, headers) dataJson_page = getrequest(href_page, headers)
parase(com_name, social_code, dataJson_page) parase(com_name, social_code, dataJson_page)
time.sleep(2) time.sleep(5)
++ "b/\345\233\275\345\212\241\351\231\242\351\227\256\347\255\224\345\257\271\345\244\204\347\220\206/qa\351\200\211\347\231\273.py"
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论