提交 b1673733 作者: 刘伟刚
......@@ -678,9 +678,19 @@ class BaseCore:
id = selects[0]
return id
# 更新企业的CIK
def updateCIK(self,social_code,cik):
try:
sql = f"UPDATE EnterpriseInfo SET CIK = '{cik}' WHERE SocialCode = '{social_code}'"
cnn = self.pool_caiji.connection()
cursor = cnn.cursor()
cursor.execute(sql)
cnn.commit()
cursor.close()
cnn.close()
except:
log = self.getLogger()
log.info('======保存企业CIK失败=====')
......
......@@ -116,33 +116,6 @@ def NoticeEnterprise_task():
print('定时采集异常', e)
pass
#企业年报
def AnnualEnterprise():
cnx,cursor = connectSql()
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业年报定时任务
def AnnualEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每年执行一次
scheduler.add_job(AnnualEnterprise, 'cron', second='*/10')
try:
# 定时开始前执行一次
AnnualEnterprise()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
#企业基本信息
def BaseInfoEnterprise():
cnx,cursor = connectSql()
......@@ -245,6 +218,33 @@ def weixin_task():
print('定时采集异常', e)
pass
#企业年报证监会
def AnnualEnterprise():
cnx,cursor = connectSql()
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业年报定时任务
def AnnualEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每年执行一次
scheduler.add_job(AnnualEnterprise, 'cron', second='*/10')
try:
# 定时开始前执行一次
AnnualEnterprise()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
# 企业年报——雪球网
def AnnualEnterpriseXueQ():
cnx,cursor = connectSql()
......@@ -271,6 +271,21 @@ def AnnualEnterpriseXueQ_task():
print('定时采集异常', e)
pass
#企业年报--美国证券交易委员会
def AnnualEnterpriseUS():
cnx,cursor = connectSql()
# 获取美股企业
us_query = "select SocialCode from EnterpriseInfo where Place = '2' and SecuritiesType = '美股' and SecuritiesCode is not null"
# us_query = "select SocialCode from EnterpriseInfo where Place = '2' and SecuritiesType = '美股' and SecuritiesCode = 'BP' "
#ZZSN22080900000025
cursor.execute(us_query)
us_result = cursor.fetchall()
us_social_list = [item[0] for item in us_result]
print('=======')
for item in us_social_list:
r.rpush('AnnualEnterprise:usqy_socialCode', item)
closeSql(cnx,cursor)
#国外企业基本信息 redis中放入id
def BaseInfoEnterpriseAbroad():
cnx,cursor = connectSql()
......@@ -301,29 +316,45 @@ def BaseInfoAbroad_task():
def FBS():
cnx,cursor = connectSql()
# todo:调整为获取福布斯的数据库
# gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
# cursor.execute(gw_query)
# gw_result = cursor.fetchall()
gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
cursor.execute(gw_query)
gw_result = cursor.fetchall()
#获取国内企业
gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 "
# gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 "
# cursor.execute(gn_query)
# gn_result = cursor.fetchall()
#
# gn_social_list = [item[0] for item in gn_result]
gw_social_list = [item[0] for item in gw_result]
for item in gw_social_list:
r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
# r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
# for item in gn_social_list:
# if not r.exists(item):
# # r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# # r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('AnnualEnterprise:gnshqy_socialCode',item)
# # r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# # r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor)
#省属国有企业 盟市国有企业
def MengZhi():
cnx, cursor = cnn11()
gn_query = "select * from t_0906 a where not exists (select 1 from sys_base_enterprise_executive b where a.xydm =b.social_credit_code)"
# gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where b.type=4 and a.SocialCode=b.SocialCode;"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
# gw_social_list = [item[0] for item in gw_result]
# for item in gw_social_list:
# r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
# r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
for item in gn_social_list:
if not r.exists(item):
# r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
r.rpush('AnnualEnterprise:gnshqy_socialCode',item)
# r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor)
# r.rpush('BaseInfoEnterpriseMz:gnqy_socialCode', item)
r.rpush('CorPersonEnterprise:gnqy_socialCode', item)
closeSql(cnx, cursor)
#将IPO的国外股票代码放到redis中
def yahooCodeFromSql():
......@@ -366,7 +397,9 @@ if __name__ == "__main__":
# NewsEnterprise_task()
# NewsEnterprise()
# BaseInfoEnterprise()
FBS()
# FBS()
# MengZhi()
AnnualEnterpriseUS()
# NoticeEnterprise_task()
# AnnualEnterprise_task()
# NoticeEnterprise()
......
import json
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
url = 'http://www.imlooker.com/v4/company/142660.html'
req = requests.get(url)
soup = BeautifulSoup(req.content,'html.parser')
# print(soup)
info = soup.find('div',id='mydiv1')
company_name = '台玻集团'
WebSite =info.find('div').text.split('官网:')[1]
#简介
info.find('div').decompose()
briefInfo = info.text.strip()
table_info = soup.find_all('div',class_='com_340')[1]
# print(table_info)
td_list = table_info.find_all('td')
# print(td_list)
incorporationDate = td_list[1].text
businessRange = td_list[3].text
scale = td_list[11].text
address = td_list[13].text
aa_dict = {
'qccId': '', # 企查查企业id
'name': company_name, # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': '', # 统一社会信用代码
'legalPerson': '', # 法定代表人
'officialPhone': '', # 电话
'officialUrl': WebSite, # 官网
'officialEmail': '', # 邮箱
'briefInfo': briefInfo, # 简介
'registerStatus': '', # 登记状态
'incorporationDate': incorporationDate, # 成立日期
'capital': '', # 注册资本
'paidCapital': '', # 实缴资本
'approvalDate': '', # 核准日期
'organizationCode': '', # 组织机构代码
'registerNo': '', # 工商注册号
'taxpayerNo': '', # 纳税人识别号
'type': '', # 企业类型
'businessStartDate': '', # 营业期限自
'businessEndDate': '', # 营业期限至
'taxpayerQualification': '', # 纳税人资质
'industry': '', # 所属行业
'region': '',
'province': '台湾省', # 所属省
'city': '台北市', # 所属市
'county': '松山区', # 所属县
'registerDepartment': '', # 登记机关
'scale': scale, # 人员规模
'insured': '', # 参保人数
'beforeName': '', # 曾用名
'englishName': 'Taiwan Glass Group', # 英文名
'importExportEnterpriseCode': '', # 进出口企业代码
'address': address, # 地址
'businessRange': businessRange, # 经营范围
'status': 0, # 状态
}
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(aa_dict, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
except:
exception = 'kafka传输失败'
......@@ -333,7 +333,7 @@ if __name__ == '__main__':
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
social_code = '91110000802100433B'
social_code = '91330000734530895W'
if social_code == '':
time.sleep(20)
continue
......
......@@ -37,11 +37,15 @@ def find_id_by_name(start,token,name):
time.sleep(5)
continue
time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'}
#{'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
......
"""
知网论文采集 模拟点击 封ip
"""
import pymysql
import requests,re,time,random
import pandas as pd
from selenium import webdriver
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from base.BaseCore import BaseCore
baseCore = BaseCore()
cnx = baseCore.cnx
cursor = baseCore.cursor
def get_proxy():
sql = "select proxy from clb_proxy"
cursor.execute(sql)
proxy_lists = cursor.fetchall()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
proxy_list.append(proxy)
return proxy_list
# 设置浏览器启动参数
capabilities = webdriver.DesiredCapabilities.CHROME.copy()
get_proxy().proxy.add_to_capabilities(capabilities)
info = pd.read_excel('全球创新指标数据(1).xlsx')
enterprise_name_list = []
industry_list = []
for i in range(info.shape[0]):
# print(info['contrast_name'][i])
if info['contrast_name'][i]=='发表论文数量' :
enterprise_name = info['enterprise_name'][i]
if enterprise_name == '中国石油天然气股份有限公司':
pass
else:
continue
industry = info['industry'][i]
industry_list.append(industry)
enterprise_name_list.append(enterprise_name)
df_all = pd.DataFrame({'公司名称':enterprise_name_list,
'行业':industry_list})
df_all['文章发表数'] = ''
# for year in range(2022,1989,-1):
# df_all[f'{year}'] = ''
# print(df_all)
list_one_info = []
def get_num(com_name,com_industry):
url = f'https://kns.cnki.net/kns8/DefaultResult/Index?dbcode=CFLQ&kw={com_name}&korder=AF'
browser.get(url) # 跳到指定页面
time.sleep(2)
btn = browser.find_element(By.XPATH, '/html/body/div[3]/div[1]/div/div/a/span')
btn.click()
print('点击1成功')
time.sleep(3)
btn2 = browser.find_element(By.XPATH,'//*[@id="divGroup"]/dl[3]/dt')
btn2.click()
print("点击2成功")
time.sleep(1)
page_source = browser.page_source # 获取页面信息
soup = BeautifulSoup(page_source, 'html.parser')
num_all = soup.find_all('div', {'class': 'resultlist'})[3].find('ul').find_all('li')
if num_all:
for li in num_all:
year = li.find('a').text
num = li.find('span').text.split('(')[1].split(')')[0]
dic_json = {
'enterprise_name':com_name,
'year':year,
'num':num,
'source':'国内外企业发布文章数量来源:中国知网',
'industry':com_industry
}
list_one_info.append(dic_json)
else:
dic_json = {
'enterprise_name': com_name,
'year': '',
'num': '',
'source': '国内外企业发布文章数量来源:中国知网',
'industry': com_industry
}
list_one_info.append(dic_json)
return list_one_info
chromedriver = 'D:\Chrome\chromedriver.exe'
browser = webdriver.Chrome(chromedriver)
for i in range(0,len(df_all)):
com_name = df_all['公司名称'][i]
com_industry=df_all['行业'][i]
try:
list_one_info = get_num(com_name,com_industry)
except:
continue
print(list_one_info)
df_info = pd.DataFrame(list_one_info)
df_info.to_excel('年份-论文发表数量.xlsx',index=False)
"""
打开SEC网址——【FILINGS】——【Company Filing】——输入证券代码——选10-K和20-F为年报
"""
import json
import re
import time
from base.BaseCore import BaseCore
baseCore = BaseCore()
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from selenium import webdriver
url = 'https://www.sec.gov/edgar/browse/?CIK=1815846&owner=exclude'
def spider(com_name,cik):
url = f'https://www.sec.gov/edgar/browse/?CIK={cik}&owner=exclude'
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# print(soup)
select_ann = soup.find_all('tr', class_='odd')
for tr in select_ann:
form_type = tr.find('td').text
if form_type == '20-F':
# print(tr)
# 获取原文链接
href = tr.find('a', class_='document-link')['href']
print(href)
if 'ix?doc' in href:
href = 'https://www.sec.gov/' + href.split('/ix?doc=/')[1]
else:
href = 'https://www.sec.gov' + href
print(href)
# 获取发布时间
a_list = tr.find_all('a')
# print(a_list)
for a in a_list:
text = a.text
match = re.search(pattern, text)
if match:
pub_date = match.group(0)
# print(pub_date)
year = pub_date[:4]
break
else:
pub_date = ''
year = ''
# 根据年报的链接,请求年报内容,不需要上传文件服务器,直接发送kafka
browser.get(href)
time.sleep(3)
i_page_source = browser.page_source
i_soup = BeautifulSoup(i_page_source, 'html.parser')
# print(i_page_source)
content = i_soup.text
# 采集下来正文内容,直接传输kafka
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
title = f'{com_name}:{year}年年度报告'
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': i_page_source,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': 'SEC美国证券交易委员会',
'publishDate': pub_date,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
# try:
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# kafka_result = producer.send("researchReportTopic",
# json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
#
# print(kafka_result.get(timeout=10))
#
# dic_result = {
# 'success': 'ture',
# 'message': '操作成功',
# 'code': '200',
# }
# print(dic_result)
#
# except Exception as e:
# dic_result = {
# 'success': 'false',
# 'message': '操作失败',
# 'code': '204',
# 'e': e
# }
def getrequest(social_code,url,headers,data):
#通过请求post接口获取企业的CIK
response = requests.post(url=url, headers=headers, data=data) # ,proxies=ip)
response.encoding = response.apparent_encoding
# 检查响应状态码
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
result = response.json()
# print(result)
pass
else:
# 请求失败,输出错误信息
print('请求失败:', response.status_code, response.text)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
result = ''
return result
#模拟浏览器
chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver)
pattern = r"\d{4}-\d{2}-\d{2}"
if __name__ == '__main__':
headers = {
'authority': 'efts.sec.gov',
'method': 'POST',
'path': '/LATEST/search-index',
'scheme': 'https',
'accept': '*/*',
'accept-encoding': 'gzip deflate br',
'accept-language': 'zh-CNzh;q=0.9en;q=0.8',
'content-length': '34',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'origin': 'https://www.sec.gov',
'referer': 'https://www.sec.gov/',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/80.0.3987.116 Safari/537.36'
}
url = 'https://efts.sec.gov/LATEST/search-index'
num = 0
taskType = '企业年报/雪球网'
while True:
start_time = time.time()
social_code = ''
# if not social_code:
# time.sleep(20)
# continue
# if social_code == 'None':
# time.sleep(20)
# continue
# if social_code == '':
# time.sleep(20)
# continue
# dic_info = baseCore.getInfomation(social_code)
# count = dic_info[15]
# code = dic_info[3]
# com_name = dic_info[4]
# if code is None:
# exeception = '股票代码为空'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
# continue
code = 'BP'
#"MNSO" post请求 获取企业CIK
# payload = {"keysTyped":f"{code}","narrow":flag}
payload = {"keysTyped": "BP", "narrow":True}
data = json.dumps(payload)
result = getrequest(social_code,url,headers,data)
# print(result)
#判断接口返回的数据哪一条是该企业 根据股票代码
tickers = result['hits']['hits']
for ticker in tickers:
i_t = ticker['_source']['tickers']
if i_t == code:
cik = ticker['_id']
print(cik)
break
# break
spider(cik)
break
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
print(soup)
select_ann = soup.find_all('tr',class_='odd')
for tr in select_ann:
want_type = tr.find('td').text
if want_type=='20-F':
print('yes')
#获取原文链接
td = tr.find('td').find('a',class_='document-link')['title_href']
print(td)
import requests, re, time, pymysql
++ /dev/null
import requests, re, time, pymysql
from bs4 import BeautifulSoup as bs
from fdfs_client.client import get_tracker_conf, Fdfs_client
from base import BaseCore
baseCore = BaseCore.BaseCore()
requests.adapters.DEFAULT_RETRIES = 3
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
taskType = '企业年报/雪球网'
def tableUpdate(year, com_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, create_time):
sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s'''
cursor.execute(sel_sql, (item_id, year))
selects = cursor.fetchone()
if selects:
print(f'{com_name},{year}已存在')
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, com_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by,
create_time)
cursor.execute(Upsql, values) # 插入
cnx.commit() # 提交
print("更新完成:{}".format(Upsql))
def getContent(social_code, com_name, code,start_time):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
}
comp = re.compile('-?[1-9]\d*')
num = 1
ip = baseCore.get_proxy()
url_1 = f'https://vip.stock.finance.sina.com.cn/corp/go.php/vCB_Bulletin/stockid/{code}/page_type/ndbg.phtml'
res_1 = requests.get(url_1, proxies=ip)
soup = bs(res_1.content, 'html.parser',from_encoding='gb2312')
# 获取年度报告列表
try:
list_all = soup.find('div', {'class': 'datelist'}).find_all('a')
except:
log.info(f'{social_code}.........年度报告列表为空')
exception = '年度报告列表为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
return
# 获取年报详细信息
for href in list_all:
ip = baseCore.get_proxy()
year_url = 'https://vip.stock.finance.sina.com.cn' + href.get('href')
year_name = href.text
res_2 = requests.get(year_url, proxies=ip)
soup_2 = bs(res_2.content, 'html.parser',from_encoding='gb2312')
try:
pdf_url = soup_2.find('th', {'style': 'text-align:center'}).find('a').get('href')
except:
log.error(f'{social_code}....{year_url}....无下载链接')
exception = '无下载链接'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
continue
for i in range(0, 3):
try:
resp_content = requests.get(pdf_url, headers=headers, verify=False, timeout=20).content
break
except:
time.sleep(3)
continue
try:
year = comp.findall(year_name)[0]
except:
continue
name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
result = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
break
except Exception as e:
log.error(f'{social_code}...年报上传服务器出错:{e}')
time.sleep(3)
continue
if result == '':
exception = '上传服务器失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
continue
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
type_id = '1'
item_id = social_code
group_name = 'group1'
path = bytes.decode(result['Remote file_id']).replace('group1', '')
full_path = bytes.decode(result['Remote file_id'])
category = 'pdf'
file_size = result['Uploaded size']
order_by = num
status = 1
create_by = 'XueLingKun'
create_time = time_now
try:
tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size,
order_by, status, create_by, create_time)
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '')
except:
exception = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
def begin():
while True:
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
social_code = '91100000100003962T'
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
code = dic_info[3]
com_name = dic_info[4]
if code is None:
exeception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
continue
getContent(social_code, com_name, code,start_time)
count += 1
runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count)
break
if __name__ == '__main__':
begin()
cursor.close()
cnx.close()
baseCore.close()
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -97,10 +97,10 @@ def spider_annual_report(dict_info,num):
try:
# 标题中有年份,
year = re.findall('\d{4}', year_name)[0]
year = re.findall('\d{4}\s*年', year_name)[0]
if com_name != 'null':
name_pdf = f"{com_name}:{year}年报.pdf".replace('*', '')
name_pdf = f"{com_name}:{year}年报.pdf".replace('*', '')
else:
name_pdf = pdf_name_a + '.pdf'
except:
......@@ -144,6 +144,10 @@ def spider_annual_report(dict_info,num):
# name_pdf = pdf_name_a + '.pdf'
with cnx.cursor() as cursor:
if '年' in year:
year = year.split('年')[0]
else:
pass
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s and type_id="1" '''
cursor.execute(sel_sql, (social_code, int(year)))
selects = cursor.fetchone()
......@@ -208,7 +212,7 @@ def spider_annual_report(dict_info,num):
'code': '200',
}
print(dic_result)
return True
# return True
except Exception as e:
dic_result = {
'success': 'false',
......@@ -225,6 +229,8 @@ def spider_annual_report(dict_info,num):
# num = num + 1
time.sleep(2)
# browser.quit()
return True
#state1
if __name__ == '__main__':
......@@ -233,8 +239,8 @@ if __name__ == '__main__':
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
# social_code = '911100007109288314'
# social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
social_code = '9133060072360502XQ'
if not social_code:
time.sleep(20)
continue
......@@ -245,7 +251,7 @@ if __name__ == '__main__':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
count = dic_info[16]
code = dic_info[3]
com_name = dic_info[4]
if code is None:
......
"""
"""
......@@ -534,7 +534,7 @@ def job(taskType):
baseCore.close()
if __name__=='__main__':
task_type = '财务数据/东方财富网/福布斯'
task_type = '财务数据/东方财富网'
job(task_type)
......
"""
"""
......@@ -293,11 +293,12 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
# 信息插入数据库
insert = InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type)
log.info(f'======={short_name}========{code}===插入公告库成功')
if insert:
# # 公告信息列表
# okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
log.info(f'======={short_name}========{code}===插入公告库成功')
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num)
if result:
......@@ -319,7 +320,9 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
# except:
# pass
continue
else:
log.info(f'======={short_name}========{code}===已存在')
continue
if __name__ == '__main__':
num = 0
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -440,11 +440,21 @@ class BaseCore:
def doc_page(self,file_path):
doc = Document(file_path)
return len(doc.sections)
def pdf_page(self,resp_content):
# 解析pdf文件
with fitz.open(stream=resp_content, filetype='pdf') as doc:
page_size = doc.page_count
return page_size
def pdf_content(self,resp_content):
# 解析pdf文件内容
content = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
with fitz.open(stream=resp_content, filetype='pdf') as doc:
# page_size = doc.page_count
for page in doc.pages():
content += page.get_text()
break
except:
time.sleep(3)
continue
return content
# 替换为绝对路径之后,解析出来a.href
def uploadToserver(self,file_href,item_id):
......@@ -477,7 +487,7 @@ class BaseCore:
# page_size = self.pdf_page(resp_content)
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content)
result = client.upload_by_buffer(resp_content,file_ext_name=category.replace('.',''))
self.getLogger().info('-------文件上传成功------')
break
except:
......
import random
import random
import time
from tqdm import tqdm
import pandas as pd
import pymysql
import requests
from bs4 import BeautifulSoup
import urllib3
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
cnx = baseCore.cnx
cursor = baseCore.cursor
headers = {
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9',
'Cache-Control':'no-cache',
'Connection':'keep-alive',
'Cookie':'Hm_lvt_fa835457efbc11dfb88752e70521d23b=1690184499; Hm_lpvt_fa835457efbc11dfb88752e70521d23b=1690184499; SF_cookie_1=98184645; Hm_lvt_2b5618a441c142a90e1a75f4b226c252=1690189470; Hm_lpvt_2b5618a441c142a90e1a75f4b226c252=1690189470; zh_choose=n; wdcid=30ffdae06d11dbde; wdlast=1690189470; wdses=13ee59561f2fb725',
'Host':'www.sasac.gov.cn',
'Pragma':'no-cache',
'Referer':'https://www.baidu.com/link?url=CcQEFfXAeQsxu1IlLlxj8WHugAcJ7sBjOBqvZYDfN7WE6OZpSUM4prK6DiADOqTP&wd=&eqid=d507a037000987780000000364be37d4',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
}
# 创建一个ExcelWriter对象
writer = pd.ExcelWriter('国务院厅局.xlsx')
url = 'http://www.sasac.gov.cn/n2588020/index.html'
ip = baseCore.get_proxy()
res = requests.get(url,headers,proxies=ip)
soup = BeautifulSoup(res.content,'html.parser')
time.sleep(2)
#厅局列表
list_type = soup.find('div',class_='l-jgkk-right column').find_all('dd')
list_error = []
for type in tqdm(list_type[:2]):
list_news = []
href_type = type.find('a')['href']
ting_type = type.find('a').text
print(f'\n================厅局类别==={ting_type}========================')
if 'http' in href_type:
url_type = href_type
else:
url_type = 'http://www.sasac.gov.cn/' + href_type.replace('../','')
# print(url_type)
i_res = requests.get(url_type,headers)
i_soup = BeautifulSoup(i_res.content,'html.parser')
time.sleep(2)
news_list = i_soup.find('div',class_='tjywBottom').find_all('li')
#文章列表
# print('================新闻列表==================')
for news in tqdm(news_list[:2]):
try:
news_href = news.find('a')['href']
except:
continue
if 'http' in news_href:
news_url = news_href
else:
news_url = 'http://www.sasac.gov.cn/' + news_href.replace('../','')
news_title = news.find('a').text.split('[')[0]
print(f'\n----正在采集: {news_title}-------')
pub_time = news.find('span').text.replace('[','').replace(']','')
#文章信息
ii_res = requests.get(news_url,headers)
ii_soup = BeautifulSoup(ii_res.content,'html.parser')
# todo:相对路径转化为绝对路径
time.sleep(2)
try:
news_info = ii_soup.find('div',class_='zsy_cotitle')
except Exception as e:
print(e)
news_info = ''
if news_info:
try:
pub_source = news_info.find('p').text.split('文章来源:')[1].split('发布时间')[0]
except:
pub_source = ''
try:
content = ii_soup.find('div','zsy_comain').text.replace('扫一扫在手机打开当前页','').strip()
except:
content = ''
# print(news_url)
dic_news = {
'标题':news_title,
'发布时间':pub_time,
'来源':pub_source,
'内容':content,
'原文链接':news_url
}
list_news.append(dic_news)
else:
dic_error = {
'标题': news_title,
'原文链接':news_url,
'厅局类别':ting_type
}
list_error.append(dic_error)
df = pd.DataFrame(list_news)
# 将数据写入不同的sheet页
df.to_excel(writer, sheet_name=ting_type,index=False)
print(f'=============当前sheet页{ting_type}---数据总数:{len(df)}================')
time.sleep(1)
writer.save()
df_error = pd.DataFrame(list_error)
df_error.to_excel('未采到文章.xlsx',index=False)
......@@ -13,7 +13,7 @@ baseCore = BaseCore()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log = baseCore.getLogger()
headers = {
'Cookie':'HWWAFSESID=0e10b77869899be8365; HWWAFSESTIME=1688781923708; csrfToken=VeTF4UIZKJ0q6yWmgfC_FLqv; TYCID=e7cec7501d3311eea9dcb9fb7af79aad; ssuid=3142278034; sajssdk_2015_cross_new_user=1; bannerFlag=true; _ga=GA1.2.1006597844.1688781929; _gid=GA1.2.146077413.1688781929; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1688781929; tyc-user-info={%22state%22:%220%22%2C%22vipManager%22:%220%22%2C%22mobile%22:%2217103123002%22}; tyc-user-info-save-time=1688781977329; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxNzEwMzEyMzAwMiIsImlhdCI6MTY4ODc4MTk3NiwiZXhwIjoxNjkxMzczOTc2fQ.Luw0DCFul8WxRNOM8X5-NCmy_z3BwJC5JBvofWqWkSQOleJ6zJU0SRbqwAobPfOfVyGFDUBqmxxWd4YKCeCWeQ; tyc-user-phone=%255B%252217103123002%2522%255D; searchSessionId=1688778331.16177575; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22302953956%22%2C%22first_id%22%3A%22189333f38cb947-0fb9b252742a6c-26031d51-921600-189333f38cdcdd%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg5MzMzZjM4Y2I5NDctMGZiOWIyNTI3NDJhNmMtMjYwMzFkNTEtOTIxNjAwLTE4OTMzM2YzOGNkY2RkIiwiJGlkZW50aXR5X2xvZ2luX2lkIjoiMzAyOTUzOTU2In0%3D%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22302953956%22%7D%2C%22%24device_id%22%3A%22189333f38cb947-0fb9b252742a6c-26031d51-921600-189333f38cdcdd%22%7D; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1688781980',
'Cookie':'HWWAFSESID=b6312a4594bea18413c; HWWAFSESTIME=1686818921445; csrfToken=e7sNDKWelJwlcjnm6Rlny887; TYCID=6ff6bc600b5911ee89d35bf79a73a3b1; bannerFlag=true; ssuid=1534238432; refresh_page=0; _ga=GA1.2.1790752229.1688467828; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22307016917%22%2C%22first_id%22%3A%22188be3e337e4bf-0d85716d366e44-26031d51-1049088-188be3e337f19e%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E8%87%AA%E7%84%B6%E6%90%9C%E7%B4%A2%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Fwww.baidu.com%2Flink%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg4YmUzZTMzN2U0YmYtMGQ4NTcxNmQzNjZlNDQtMjYwMzFkNTEtMTA0OTA4OC0xODhiZTNlMzM3ZjE5ZSIsIiRpZGVudGl0eV9sb2dpbl9pZCI6IjMwNzAxNjkxNyJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22307016917%22%7D%2C%22%24device_id%22%3A%22188be3e337e4bf-0d85716d366e44-26031d51-1049088-188be3e337f19e%22%7D; jsid=SEO-BAIDU-ALL-SY-000001; bdHomeCount=7; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1693986307; tyc-user-info=%7B%22state%22%3A%220%22%2C%22vipManager%22%3A%220%22%2C%22mobile%22%3A%2213592481839%22%7D; tyc-user-info-save-time=1693986377592; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzU5MjQ4MTgzOSIsImlhdCI6MTY5Mzk4NjM3NywiZXhwIjoxNjk2NTc4Mzc3fQ.xeK54nMtB5wt7ipdOjhrzdplT1azvezrTuoD1b8i3OguqMB97ZOR1pFbRsP7vsKRdZ3Fsf5Y5ZqlmRKAVHGraA; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1693986412',
# 'Cookie': 'TYCID=82cbe530204b11ed9f23298cecec1c60; ssuid=3927938144; _ga=GA1.2.1842488970.1670638075; jsid=SEO-BAIDU-ALL-SY-000001; tyc-user-info={%22state%22:%220%22%2C%22vipManager%22:%220%22%2C%22mobile%22:%2215565837784%22}; tyc-user-info-save-time=1678953978429; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxNTU2NTgzNzc4NCIsImlhdCI6MTY3ODk1Mzk3OCwiZXhwIjoxNjgxNTQ1OTc4fQ.wsNxLWMkZVrtOEvo_CCDPD38R7F23c5yk7dFAdHkwFPkZhEEvmiv0nlt7UD0ZWfo3t8aYxc4qvu4ueEgMubJ5g; tyc-user-phone=%255B%252215565837784%2522%255D; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22284710084%22%2C%22first_id%22%3A%22182b9ca585ead-089598c1d7f7928-26021d51-1327104-182b9ca585f7f1%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E8%87%AA%E7%84%B6%E6%90%9C%E7%B4%A2%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Fwww.baidu.com%2Flink%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfbG9naW5faWQiOiIyODQ3MTAwODQiLCIkaWRlbnRpdHlfY29va2llX2lkIjoiMTgyYjljYTU4NWVhZC0wODk1OThjMWQ3Zjc5MjgtMjYwMjFkNTEtMTMyNzEwNC0xODJiOWNhNTg1ZjdmMSJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22284710084%22%7D%2C%22%24device_id%22%3A%22182b9ca585ead-089598c1d7f7928-26021d51-1327104-182b9ca585f7f1%22%7D; HWWAFSESID=fa776898fa88a6520ea; HWWAFSESTIME=1679899464128; csrfToken=m3cB6mHsznwIuppkT-S8oYc6; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1679016180,1679471093,1679732923,1679899468; bdHomeCount=28; bannerFlag=true; show_activity_id_92=92; searchSessionId=1679899783.48494979; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1679899783',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
......@@ -27,7 +27,7 @@ def doJob():
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('CorPersonEnterprise:gnqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待
# social_code = 'ZZSN23011300000004'
# social_code = '91110108778635402E'
if social_code == None:
time.sleep(20)
continue
......@@ -143,7 +143,10 @@ def doJob():
if list_all:
for one_info in list_all:
name = one_info['personal_name']
sex = one_info['gender2']
try:
sex = one_info['gender2']
except:
sex = ''
education = ''
position = one_info['position_name']
Salary = ''
......
......@@ -37,6 +37,11 @@ def getTycIdByXYDM(xydm):
response = requests.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip)
time.sleep(random.randint(3, 5))
retJsonData =json.loads(response.content.decode('utf-8'))
if retJsonData['data'] and retJsonData['state']== 'ok':
pass
else:
log.error(f"---{xydm}-未查询到该企业---")
return retData['tycData']
matchType=retJsonData['data'][0]['matchType']
if matchType=='信用代码匹配':
retData['state'] = True
......
......@@ -54,11 +54,10 @@ if __name__=="__main__":
# chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
chromedriver = r'D:/chrome/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(70)
time.sleep(60)
s = requests.session()
#获取到token和cookies
......
......@@ -4,18 +4,14 @@
'''
import requests, time, random, json, pymysql, redis
import pandas as pd
import urllib3
from bs4 import BeautifulSoup
from openpyxl import Workbook
from selenium import webdriver
from obs import ObsClient
from kafka import KafkaProducer
# logging.basicConfig(filename='example.log', level=logging.INFO)
from base.BaseCore import BaseCore
import os
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
......@@ -25,48 +21,6 @@ cursor_ = baseCore.cursor
r = baseCore.r
urllib3.disable_warnings()
def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}',article_url)
if res == 1:
return True
else:
return False
def add_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式
if res == 0: # 若返回0,说明插入不成功,表示有重复
return True
else:
return False
# #定时
# def getFromSql():
# selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
# cursor.execute(selectSql)
# results = cursor.fetchall()
# result_list = [item[0] for item in results]
#
# #放入redis
# for item in result_list:
# r.rpush('WeiXinGZH:infoSourceCode', item)
#
# #刷新浏览器并获得token
# def flushAndGetToken(list_b):
# browser_run = list_b[0]
# log.info('======刷新浏览器=====')
# browser_run.refresh()
# cookie_list = browser_run.get_cookies()
# cur_url = browser_run.current_url
# token = cur_url.split('token=')[1]
# log.info(f'===========当前token为:{token}============')
# cookies = {}
# for cookie in cookie_list:
# cookies[cookie['name']] = cookie['value']
# return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
......@@ -165,22 +119,33 @@ def get_info(dict_json):
img_one.extract()
else:
try:
name_img = url_src.split('/')[-2] + '.' + url_src.split('wx_fmt=')[1]
except:
img_one.extract()
continue
try:
res = requests.get(url_src, timeout=20)
except:
img_one.extract()
resp = obsClient.putContent('zzsn', name_img, content=res.content)
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
try:
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
try:
name_img = url_src.split('/')[-2] + '.' + url_src.split('wx_fmt=')[1]
except:
img_one.extract()
continue
try:
res = requests.get(url_src, timeout=20)
except:
img_one.extract()
continue
resp = obsClient.putContent('zzsn', name_img, content=res.content)
try:
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
except Exception as e:
log.info(f'--error--{url_news}-----------{e}')
updatewxLink(url_news, info_source_code, 300)
return False
try:
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
except Exception as e:
log.info(f'--error--{url_news}-----------{e}')
updatewxLink(url_news, info_source_code, 300)
return False
except Exception as e:
log.info(f'----{url_news}-----------{e}')
log.info(f'--error--{url_news}-----------{e}')
updatewxLink(url_news, info_source_code, 600)
return False
for tag in news_html.descendants:
......
......@@ -92,7 +92,7 @@ def insertWxList(dic_url,json_search,page):
for one_news in list_all_news:
listCount=listCount+1
news_title = one_news['title']
timestamp = one_news['create_time']
timestamp = one_news['update_time']
time_local = time.localtime(timestamp)
news_date = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
url_news = one_news['link']
......@@ -201,10 +201,19 @@ def getPageData(dic_url,page):
error = [origin, url_, info_source_code, str_t, '无效session']
insertBadSql(error)
return getPageData(dic_url, page)
elif ret == 200074:
#{"base_resp": {"ret": 200074, "err_msg": "default"}}
log.info(f'======{origin}-----{biz}----该账号未登录成功=======')
# session失效修改token
updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, '该账号未登录成功']
insertBadSql(error)
return getPageData(dic_url, page)
else:
log.info(f'======{origin}-----{biz}----该账号其他错误=======')
error = [origin, url_, info_source_code, str_t, '其他错误']
insertBadSql(error)
updateTokeen(token, 2)
return True
# 修改token使用时间
updateTokeen(token, 3)
......@@ -225,7 +234,7 @@ def getWxList(infoSourceCode):
return
origin = dic_url['name']
biz = dic_url['biz']
for page in range(1,2):
for page in range(1,6):
retFlag = getPageData(dic_url, page)
time.sleep(random.randint(60,181))
if retFlag:
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -55,19 +55,13 @@ def getZx(xydm, url, title, cnx, path):
# 动态信息列表
list_info = [
xydm,
title,
'',
content,
pub_time,
url,
'雅虎财经',
author,
'2',
'en'
'2'
]
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
......@@ -77,14 +71,10 @@ def getZx(xydm, url, title, cnx, path):
return exception
log.info(f"文章耗时,耗时{baseCore.getTimeCost(start_time_content, time.time())}")
try:
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (url, social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': id,
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': content,
......@@ -171,6 +161,7 @@ def scroll(xydm,name,gpdm):
except Exception as e:
log.error(f"{name}--{gpdm}--获取不到最后一条链接")
break
# todo:增量时 需打开注释
# try:
# selects = selectUrl(last_url_,xydm)
# except:
......@@ -189,12 +180,14 @@ def rePutIntoR(item):
if __name__ == "__main__":
path = r'D:\chrome\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
cnx = baseCore.cnx
cursor = baseCore.cursor
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gwqy_socialCode')
social_code = baseCore.redicPullData('NewsEnterpriseFbs:gwqy_socialCode')
# social_code = 'ZZSN22080900000046'
# 判断 如果Redis中已经没有数据,则等待
if not social_code :
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论