提交 9a76d2aa 作者: 丁双波

Merge remote-tracking branch 'origin/master'

...@@ -421,6 +421,7 @@ def NQEnterprise(): ...@@ -421,6 +421,7 @@ def NQEnterprise():
nq_social_list = [item[0] for item in nq_result] nq_social_list = [item[0] for item in nq_result]
for item in nq_social_list: for item in nq_social_list:
#新三板企业财务数据 上市信息 核心人员已采集 企业动态、企业公告未采集 企业公告脚本已开发,企业动态需要每天放入redis
# r.rpush('NQEnterprise:nq_Ipo', item) # r.rpush('NQEnterprise:nq_Ipo', item)
r.rpush('NQEnterprise:nq_finance',item) r.rpush('NQEnterprise:nq_finance',item)
# r.rpush('NQEnterprise:nq_notice',item) # r.rpush('NQEnterprise:nq_notice',item)
...@@ -451,11 +452,26 @@ def omeng(): ...@@ -451,11 +452,26 @@ def omeng():
#单项冠军 #单项冠军
def danxiangguanjun(): def danxiangguanjun():
pass cnx, cursor = connectSql()
query = "SELECT CompanyName FROM champion"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('champion:baseinfo',item)
#科改示范 #科改示范
def kegaishifan(): def kegaishifan():
pass cnx, cursor = connectSql()
query = "SELECT CompanyName FROM technological"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('technological:baseinfo',item)
#双百企业 #双百企业
def shuangbaiqiye(): def shuangbaiqiye():
...@@ -467,6 +483,8 @@ def zhuangjingtexind(): ...@@ -467,6 +483,8 @@ def zhuangjingtexind():
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
# danxiangguanjun()
kegaishifan()
# NoticeEnterprise() # NoticeEnterprise()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
...@@ -477,7 +495,7 @@ if __name__ == "__main__": ...@@ -477,7 +495,7 @@ if __name__ == "__main__":
# FBS() # FBS()
# MengZhi() # MengZhi()
# NQEnterprise() # NQEnterprise()
SEC_CIK() # SEC_CIK()
# omeng() # omeng()
# AnnualEnterpriseUS() # AnnualEnterpriseUS()
# NoticeEnterprise_task() # NoticeEnterprise_task()
......
...@@ -85,7 +85,22 @@ if __name__=='__main__': ...@@ -85,7 +85,22 @@ if __name__=='__main__':
ein = jsonData['ein'] # 联邦税号 ein = jsonData['ein'] # 联邦税号
address = jsonData['addresses'] address = jsonData['addresses']
city = address['business']['city'] city = address['business']['city']
business_address = address['business']['street1'] + ',' + city + ' ' + address['business']['stateOrCountryDescription'] try:
if city:
business_address = address['business']['street1'] + ',' + city + ' ' + address['business'][
'stateOrCountryDescription']
else:
business_address = address['business']['stateOrCountryDescription']
except:
try:
business_address = address['business']['street1'] + ',' + city
except:
try:
business_address = city + ' ' + address['business']['stateOrCountryDescription']
except:
business_address = ''
# city = address['business']['city']
# business_address = address['business']['street1'] + ',' + city + ' ' + address['business']['stateOrCountryDescription']
phone = jsonData['phone'] # 电话 phone = jsonData['phone'] # 电话
try: try:
formerNames = jsonData['formerNames'][0]['name'] # 曾用名 formerNames = jsonData['formerNames'][0]['name'] # 曾用名
......
"""
解析json数据 两个链接:
https://data.sec.gov/api/xbrl/companyfacts/CIK0000320193.json 数据值和gaap字段
https://www.sec.gov/Archives/edgar/data/320193/000032019322000108/MetaLinks.json html字段和gaap字段映射
step1:拼接链接
step2:
"""
import json
import time
import requests
from kafka import KafkaProducer
from operator import itemgetter
from itertools import groupby
from base.BaseCore import BaseCore
# import urllib3
# urllib3.disable_warings()
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
def fromcikgetinfo(cik):
query = f"select * from mgzqyjwyh_list where cik='{cik}' "
cursor.execute(query)
data = cursor.fetchone()
return data
def getRequest(url):
headers = {
'Host': 'data.sec.gov',
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': '_ga=GA1.2.784424676.1695174651; _4c_=%7B%22_4c_s_%22%3A%22lZFLT4QwFIX%2FyqRrILS0pbAzmBgXajQ%2BlhNpLwOZcUoKDo4T%2Fru3gMbHym5ov55zcjk9kaGGPcmpzARNuVRcxElAtnDsSH4irjH%2BcyA50awsDTUq1ElShZwZCMuKmbASSQUUKsYoIwF5w6w0ZpmIpeBKqTEgul0yTkRbA5hFs4iqKA6rDh39OxKuYty2zppX3a%2F7Y%2BtlA5SrzmzxwsCh0bAeGtPX3s8m%2BUJraDZ1jzhlE22dl0QC90OzN3b47Vvol0%2BkFGnp7NCB9xa1sy%2BwolQitlgEeZocfloHFTg3yfDUNb0ftAMdbexhAVjezMKZPTaemtV9cYf8%2Bhu5LW6uFtT6jv0YO6ufdz4UnyUgF2frh8tz%2F2%2BKc8ZlKqPPpxKUjHPfCJiksRAZldhnvyO5kjz2a5yTp%2FrpTzVXWfZXPbcQ%2Bulh%2Fx%2FrOH4A%22%7D; _ga_300V1CHKH1=GS1.1.1695174651.1.1.1695174684.0.0.0; ak_bmsc=91C6D28D093861656DB8C1FC1972DAB6~000000000000000000000000000000~YAAQlQ8kF2U6orCKAQAAgyl9uxX8kNk3C77pkMi6N6RxnsUqDbYEmIcNjtLSa8W6kfGL9cQMRHBUaYcbEA1+oXsvUwUF80G8hmH/F4S0ZOEnVCrlcBLx219N24l2qmoSKtVDH+VKe7c1bji9MHc7tO2R56R7juZJv9gceAdtKEuArkPfD8ijx/TyEgIrM+XruGtzCRmLnfq86UoJYP+j+tXcaWkc/qm1zHDReDNf/cHd6h2aRMs4lsES8+uh6YTjE7bfCp8h2DNJ2e07pm0ojcI/kdycUPHmuTqWPdTBEjUybad31E1hRNBAE8PbGjy2lvlPY/piuN3HX3Q5ifsmTqCNJzynN2kjGm6i4SHhmEAijUeIzNQXB11GrVmALJVV6pEjd/uu; bm_sv=FD8981426EA388050697DFB615BAFFE3~YAAQ1wcsF5K72ZSKAQAAsvl/uxUw0do3nknGCkllXH27UZBpM7kQUXm4crBNTAkhek5YSDKIrrm2uFWidfpBfyxbRSr+w7FH7Y0w4cXMAa7BELzcc/B9Uf8T6e2I2W29wjurKkBFtSseslHSqYD3BWx9/GidJMW+dFNrlzNUMd1dONUR9J1TDnYifPhE6A/zSLPHVrCTJl7xzg7VlW/05Ay0i+Bo7TynZdWgotfjET3vg2/ZVixVSGaWeQo4~1'
}
for m in range(0,3):
try:
response = requests.get(url=url,headers=headers,verify=False)
break
except Exception as e:
log.error(f"request请求异常-------{e}")
continue
# 检查响应状态码
if response.status_code == 200:
jsonData = response.json()
return jsonData
else:
return False
if __name__=='__main__':
taskType = '财务数据/SEC'
zcfzb_mapping = {
'AccountsAndOtherReceivablesNetCurrent':'指标1'
}
lrb_mapping = {
}
xjllb_mapping = {
}
while True:
start_time = time.time
# todo:从redis中获取企业cik
# cik = baseCore.redicPullData('sec_cik_US:uscik')
cik = '320193'
#通过cik去数据库中获取信息
data = fromcikgetinfo(cik)
com_name = data[2]
com_code = data[3]
exchange = data[4]
#拼接链接的cik是十位数
url_cik = cik
while True:
if len(url_cik) < 10:
url_cik = '0' + url_cik
else:
break
url = f'https://data.sec.gov/api/xbrl/companyfacts/CIK{url_cik}.json'
jsonData = getRequest(url)
if jsonData:
pass
print(jsonData)
try:
us_gaap = jsonData['facts']['us-gaap']
except:
continue
# 遍历map的key值
Listzcfzb = []
for key in zcfzb_mapping.keys():
# 一个财务指标的所有年份和金额
usd_list = us_gaap[key]['units']['USD']
# form: 10-K fp: FY
for j in usd_list:
form = usd_list[j]['form']
fp = usd_list[j]['fp']
if form=='10-K' and fp=='FY':
pass
else:
continue
date = usd_list[j]['end']
if date.endswith('03-31') or date.endswith('06-30') or date.endswith('09-30') or date.endswith('12-31'):
pass
else:
continue
val = usd_list[j]['val']
zcfzb_dic ={
'zbname': key,
'riqi': date,
'jine': val,
'fp': fp,
'form': form
}
# 资产负债表所有年份指标
Listzcfzb.append(zcfzb_dic)
Listzcfzb.sort(key=itemgetter('riqi'))
groups = groupby(Listzcfzb, key=itemgetter('riqi'))
# 遍历每个分组,并打印分类结果
for riqi, group in groups:
print(f"riqi: {riqi}")
# 迭代表达式
listbydate = [item for item in group]
print()
"""从html页面中抽取表格"""
import requests
from bs4 import BeautifulSoup
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
def getRequest(url):
headers = {
'Referer': 'https://www.sec.gov/ix?doc=/Archives/edgar/data/356037/000035603723000038/cspi-20230630x10q.htm',
'Sec-Ch-Ua': '"Microsoft Edge";v="117", "Not;A=Brand";v="8", "Chromium";v="117"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.31',
}
for m in range(0,3):
try:
response = requests.get(url=url,headers=headers,verify=False)
break
except Exception as e:
log.error(f"request请求异常-------{e}")
continue
# 检查响应状态码
if response.status_code == 200:
soup = BeautifulSoup(response.content,'html.parser')
return soup
else:
return False
def getzcfztable(soup):
table_list = soup.find_all('table')
for table in table_list:
aa = table.find_all(text='Current assets:')
if aa:
# print(table)
trlist = table.find_all('tr')
date1 = trlist[1].find_all('td')[1].text.replace('\n', '')
date2 = trlist[1].find_all('td')[-1].text.replace('\n', '')
print(date1, date2)
# todo:把td内容为空的去掉
for tr in trlist[2:]:
filtered_tags = tr(lambda tag: tag.name == 'td' and '$' in tag.text)
for tag in filtered_tags:
tag.extract()
# filtered_tags2 = tr(lambda tag:tag.name=='td' and tag.text==' ')
filtered_tags2 = tr(lambda tag: tag.name == 'td' and tag.text == '')
for tag in filtered_tags2:
tag.extract()
try:
zbtag = tr.find_all('td')[0].text.replace('\n', '')
except:
zbtag = ''
try:
cash1 = tr.find_all('td')[1].text.replace('\n', '')
except:
cash1 = ''
try:
cash2 = tr.find_all('td')[2].text.replace('\n', '')
except:
cash2 = ''
if zbtag != '' and cash1 != '' and cash2 != '':
print(f'字段:{zbtag} 值1:{cash1} 值2:{cash2}')
if __name__=='__main__':
url = 'https://www.sec.gov/Archives/edgar/data/320193/000032019321000105/aapl-20210925.htm'
soup = getRequest(url)
#html解析表格 资产负债表
getzcfztable(soup)
import json import json
import random
import requests, time, pymysql import requests, time, pymysql
import jieba import jieba
import sys import sys
...@@ -45,24 +47,21 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -45,24 +47,21 @@ def beinWork(tyc_code, social_code,start_time):
retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0} retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0}
t = time.time() t = time.time()
url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100' url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100'
for m in range(0, 3): try:
try: for m in range(0, 3):
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
response = requests.get(url=url, headers=headers, proxies=ip, verify=False) response = requests.get(url=url, headers=headers, proxies=ip, verify=False)
# time.sleep(random.randint(3, 5)) time.sleep(random.randint(3, 5))
break break
except Exception as e: if (response.status_code == 200):
pass pass
except Exception as e:
if (response.status_code == 200):
pass
else:
log.error(f"{tyc_code}-----获取总数接口失败") log.error(f"{tyc_code}-----获取总数接口失败")
e = '获取总数接口失败' error = '获取总数接口失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e) baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{error}----{e}')
return retData return retData
try: try:
json_1 = json.loads(response.content.decode('utf-8')) json_1 = json.loads(response.content.decode('utf-8'))
...@@ -177,7 +176,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -177,7 +176,7 @@ def beinWork(tyc_code, social_code,start_time):
pass pass
continue continue
try: try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,create_time) values(%s,%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
up_okCount = up_okCount + 1 up_okCount = up_okCount + 1
list_info = [ list_info = [
...@@ -185,6 +184,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -185,6 +184,7 @@ def beinWork(tyc_code, social_code,start_time):
link, link,
'天眼查', '天眼查',
'2', '2',
time_format
] ]
cursor_.execute(insert_sql, tuple(list_info)) cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit() cnx_.commit()
...@@ -214,10 +214,10 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -214,10 +214,10 @@ def beinWork(tyc_code, social_code,start_time):
} }
except Exception as e: except Exception as e:
log.info(f'传输失败:{social_code}----{link}') log.info(f'传输失败:{social_code}----{link}')
e = '数据库传输失败' error = '数据库传输失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e) baseCore.recordLog(social_code, taskType, state, takeTime, link, f'{error}----{e}')
continue continue
# print(dic_news) # print(dic_news)
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
......
import json import json
...@@ -21,6 +21,7 @@ tracker_conf = get_tracker_conf('./client.conf') ...@@ -21,6 +21,7 @@ tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
taskType = '企业年报/证监会' taskType = '企业年报/证监会'
pathType = 'ZJHAnnualReport/'
def RequestUrl(url, payload, item_id, start_time): def RequestUrl(url, payload, item_id, start_time):
# ip = get_proxy()[random.randint(0, 3)] # ip = get_proxy()[random.randint(0, 3)]
...@@ -43,26 +44,26 @@ def RequestUrl(url, payload, item_id, start_time): ...@@ -43,26 +44,26 @@ def RequestUrl(url, payload, item_id, start_time):
return soup return soup
def tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status, # def tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, create_time, page_size): # create_by, create_time, page_size):
#
sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=1''' # sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=1'''
cursor_.execute(sel_sql, (item_id, year)) # cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone() # selects = cursor_.fetchone()
if selects: # if selects:
print(f'{name_pdf},{year}已存在') # print(f'{name_pdf},{year}已存在')
#
else: # else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' # Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
#
values = ( # values = (
year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status, # year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, # create_by,
create_time, page_size) # create_time, page_size)
#
cursor_.execute(Upsql, values) # 插入 # cursor_.execute(Upsql, values) # 插入
cnx.commit() # 提交 # cnx.commit() # 提交
print("更新完成:{}".format(Upsql)) # print("更新完成:{}".format(Upsql))
# 采集信息 # 采集信息
def SpiderByZJH(url, payload, dic_info, num, start_time): def SpiderByZJH(url, payload, dic_info, num, start_time):
...@@ -121,19 +122,24 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -121,19 +122,24 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
cursor_.execute(sel_sql, (item_id, year)) cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone() selects = cursor_.fetchone()
if selects: if selects:
print(f'com_name:{short_name}、{year}已存在') log.info(f'com_name:{short_name}、{year}已存在')
continue continue
else: else:
retData = baseCore.upLoadToServe(pdf_url, 1, social_code) retData = baseCore.uptoOBS(pdf_url,name_pdf, 1, social_code,pathType,taskType,start_time)
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
#插入数据库获取att_id #插入数据库获取att_id
num = num + 1 num = num + 1
att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num) att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num)
content = retData['content'] if att_id:
if retData['state']:
pass pass
else: else:
log.info(f'====pdf解析失败====')
return False return False
content = retData['content']
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
...@@ -169,7 +175,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -169,7 +175,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
'message': '操作成功', 'message': '操作成功',
'code': '200', 'code': '200',
} }
print(dic_result) log.info(dic_result)
return True return True
except Exception as e: except Exception as e:
dic_result = { dic_result = {
...@@ -181,7 +187,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -181,7 +187,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败') baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
print(dic_result) log.info(dic_result)
return False return False
else: else:
continue continue
...@@ -311,7 +317,8 @@ if __name__ == '__main__': ...@@ -311,7 +317,8 @@ if __name__ == '__main__':
time.sleep(20) time.sleep(20)
continue continue
dic_info = baseCore.getInfomation(social_code) dic_info = baseCore.getInfomation(social_code)
count = dic_info[15] count = dic_info[16]
log.info(f'====正在采集{social_code}=====')
# 沪市http://eid.csrc.gov.cn/101111/index.html 深市http://eid.csrc.gov.cn/101811/index.html 北交所http://eid.csrc.gov.cn/102611/index.html # 沪市http://eid.csrc.gov.cn/101111/index.html 深市http://eid.csrc.gov.cn/101811/index.html 北交所http://eid.csrc.gov.cn/102611/index.html
# url 变量 翻页 栏目 http://eid.csrc.gov.cn/101811/index_3_f.html # url 变量 翻页 栏目 http://eid.csrc.gov.cn/101811/index_3_f.html
url_parms = ['101111', '101811', '102611'] url_parms = ['101111', '101811', '102611']
...@@ -322,7 +329,7 @@ if __name__ == '__main__': ...@@ -322,7 +329,7 @@ if __name__ == '__main__':
dic_parms = getUrl(code, url_parms, Catagory2_parms) dic_parms = getUrl(code, url_parms, Catagory2_parms)
SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, num, start_time) SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, num, start_time)
end_time = time.time() end_time = time.time()
print(f'{dic_info[4]} ---- 该企业耗时 ---- {end_time - start_time}') log.info(f'{dic_info[4]} ---- 该企业耗时 ---- {end_time - start_time}')
count += 1 count += 1
runType = 'AnnualReportCount' runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -152,24 +152,23 @@ def spider_annual_report(dict_info,num): ...@@ -152,24 +152,23 @@ def spider_annual_report(dict_info,num):
cursor.execute(sel_sql, (social_code, int(year))) cursor.execute(sel_sql, (social_code, int(year)))
selects = cursor.fetchone() selects = cursor.fetchone()
if selects: if selects:
print(f'com_name:{com_name}、{year}已存在') log.info(f'com_name:{com_name}、{year}已存在')
continue continue
else: else:
page_size = 0 #上传文件至obs服务器
#上传文件至文件服务器 retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time)
retData = baseCore.upLoadToServe(pdf_url,1,social_code) if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1 num = num + 1
try: try:
att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num) att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num)
content = retData['content'] content = retData['content']
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
state = 1 state = 1
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '') baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '成功')
except: except:
exception = '数据库传输失败' exception = '数据库传输失败'
state = 0 state = 0
...@@ -236,6 +235,7 @@ def spider_annual_report(dict_info,num): ...@@ -236,6 +235,7 @@ def spider_annual_report(dict_info,num):
if __name__ == '__main__': if __name__ == '__main__':
num = 0 num = 0
taskType = '企业年报/雪球网' taskType = '企业年报/雪球网'
pathType = 'XQWAnnualReport/'
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
......
...@@ -14,6 +14,12 @@ def conn11(): ...@@ -14,6 +14,12 @@ def conn11():
cursor = conn.cursor() cursor = conn.cursor()
return conn,cursor return conn,cursor
def conn144():
conn = pymysql.Connect(host='114.115.159.144', port=3306, user='caiji', passwd='zzsn9988', db='caiji',
charset='utf8')
cursor = conn.cursor()
return conn,cursor
#企业公告 #企业公告
def shizhiCodeFromSql(): def shizhiCodeFromSql():
conn,cursor=conn11() conn,cursor=conn11()
...@@ -31,6 +37,7 @@ def shizhiCodeFromSql(): ...@@ -31,6 +37,7 @@ def shizhiCodeFromSql():
finally: finally:
cursor.close() cursor.close()
conn.close() conn.close()
#企业公告 #企业公告
def yahooCodeFromSql(): def yahooCodeFromSql():
conn,cursor=conn11() conn,cursor=conn11()
...@@ -49,6 +56,25 @@ def yahooCodeFromSql(): ...@@ -49,6 +56,25 @@ def yahooCodeFromSql():
cursor.close() cursor.close()
conn.close() conn.close()
#新浪纽交所股票对应的代码
def sinausstockCodeFromSql():
conn,cursor=conn144()
try:
gn_query = "select ticker from mgzqyjwyh_list where state=2 and exchange='NYSE'; "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('sinausstockCodeFromSql开始将股票代码放入redis=======')
for item in gn_social_list:
r.rpush('sina_usstock:securities_code', item)
print('sinausstockCodeFromSql将股票代码放入redis结束')
except Exception as e:
log.info("数据查询异常")
finally:
cursor.close()
conn.close()
def yahooCode_task(): def yahooCode_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
...@@ -58,9 +84,12 @@ def yahooCode_task(): ...@@ -58,9 +84,12 @@ def yahooCode_task():
scheduler.add_job(yahooCodeFromSql, 'cron', day='*/3', hour=0, minute=0) scheduler.add_job(yahooCodeFromSql, 'cron', day='*/3', hour=0, minute=0)
# 每天执行一次 # 每天执行一次
scheduler.add_job(shizhiCodeFromSql, 'cron', hour=10,minute=0) scheduler.add_job(shizhiCodeFromSql, 'cron', hour=10,minute=0)
# 每天执行一次
scheduler.add_job(sinausstockCodeFromSql, 'cron', day='*/3', hour=0, minute=0)
try: try:
yahooCodeFromSql() # 定时开始前执行一次 # yahooCodeFromSql() # 定时开始前执行一次
shizhiCodeFromSql() # 定时开始前执行一次 # shizhiCodeFromSql() # 定时开始前执行一次
sinausstockCodeFromSql() # 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -373,6 +373,28 @@ class YahooCaiwu(object): ...@@ -373,6 +373,28 @@ class YahooCaiwu(object):
currency='' currency=''
return currency return currency
#对比指标计算
def calculateIndexReq(self):
get_url = 'http://114.115.236.206:8088/sync/calculateIndex'
try:
params={
'type':2
}
resp = requests.get(get_url,params=params)
print(resp.text)
text=json.loads(resp.text)
codee=text['code']
while codee==-200:
time.sleep(600)
resp = requests.get(get_url)
print(resp.text)
text=json.loads(resp.text)
codee=text['code']
if codee==-200:
break
print('调用接口成功!!')
except:
print('调用失败!')
if __name__ == '__main__': if __name__ == '__main__':
# parse_excel() # parse_excel()
#get_content1() #get_content1()
...@@ -383,8 +405,11 @@ if __name__ == '__main__': ...@@ -383,8 +405,11 @@ if __name__ == '__main__':
securitiescode=yahoo.getCodeFromRedis() securitiescode=yahoo.getCodeFromRedis()
yahoo.get_content2(securitiescode) yahoo.get_content2(securitiescode)
except Exception as e: except Exception as e:
print('没有数据暂停5分钟')
yahoo.calculateIndexReq()
if securitiescode: if securitiescode:
yahoo.r.rpush('NoticeEnterprise:securities_code',securitiescode) yahoo.r.rpush('NoticeEnterprise:securities_code',securitiescode)
else: else:
time.sleep(300) time.sleep(300)
print('没有数据暂停5分钟')
import configparser import configparser
...@@ -20,6 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -20,6 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from operator import itemgetter from operator import itemgetter
from itertools import groupby from itertools import groupby
import datetime import datetime
from decimal import Decimal
class SinaUsstock(object): class SinaUsstock(object):
...@@ -54,13 +55,19 @@ class SinaUsstock(object): ...@@ -54,13 +55,19 @@ class SinaUsstock(object):
seriesValue=tddoc.find('td').text().split(' ') seriesValue=tddoc.find('td').text().split(' ')
for i in range(0,len(pdate)): for i in range(0,len(pdate)):
value=seriesValue[i] value=seriesValue[i]
if '亿' in value: try:
value = value.replace("亿", "*100000000") if '亿' in value:
value = eval(value) value = value.replace("亿", "").replace(",", "")
elif '万' in value: value = Decimal(value) * Decimal('100000000')
value = value.replace("万", "*10000") # value = eval(value)
value = eval(value) elif '万' in value:
vvla=str(value) value = value.replace("万", "").replace(",", "")
value = Decimal(value) * Decimal('10000')
# value = eval(value)
except Exception as e:
print(e)
print(value)
vvla=str(value).replace(",", "")
serisemsg={ serisemsg={
'name':seriesName, 'name':seriesName,
'value':vvla, 'value':vvla,
...@@ -71,6 +78,31 @@ class SinaUsstock(object): ...@@ -71,6 +78,31 @@ class SinaUsstock(object):
return seriesList return seriesList
# 判断股票代码是否存在
def check_code(self,com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
res = r.exists('com_sinacaiwushuju_code::'+com_code)
#如果key存在 则不是第一次采集该企业, res = 1
if res:
return False #表示不是第一次采集
else:
return True #表示是第一次采集
def check_date(self,com_code,info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('com_sinacaiwushuju_code::'+com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
def add_date(self,com_code,date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
#遍历date_list 放入redis
for date in date_list:
res = r.sadd('com_sinacaiwushuju_code::'+com_code,date)
def getCodeFromRedis(self): def getCodeFromRedis(self):
securitiescode=self.r.lpop('sina_usstock:securities_code') securitiescode=self.r.lpop('sina_usstock:securities_code')
securitiescode = securitiescode.decode('utf-8') securitiescode = securitiescode.decode('utf-8')
...@@ -209,7 +241,7 @@ class SinaUsstock(object): ...@@ -209,7 +241,7 @@ class SinaUsstock(object):
#转换数据格式发送接口 #转换数据格式发送接口
annualzb=zbl1+zbl3+zbl5 annualzb=zbl1+zbl3+zbl5
annualzb=self.groupZbData(annualzb,stock,social_credit_code,'annual') annualzb=self.groupZbData(annualzb,stock,social_credit_code,'year')
self.sendToFinance(annualzb) self.sendToFinance(annualzb)
quarterzb=zbl2+zbl4+zbl6 quarterzb=zbl2+zbl4+zbl6
quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter') quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter')
...@@ -228,15 +260,26 @@ class SinaUsstock(object): ...@@ -228,15 +260,26 @@ class SinaUsstock(object):
def sendToFinance(self,zbmsg): def sendToFinance(self,zbmsg):
for zbb in zbmsg: for zbb in zbmsg:
com_code=zbb['securitiesCode']
com_date=zbb['date']
#判断股票代码是否采集过
if self.check_code(com_code):
zbb['ynFirst']=True
if len(zbb) != 0: if len(zbb) != 0:
# 调凯歌接口存储数据 # 调凯歌接口存储数据
data = json.dumps(zbb) data = json.dumps(zbb)
#暂无接口 #暂无接口
url_baocun = '' url_baocun = 'http://114.115.236.206:8088/sync/finance/sina'
# url_baocun = 'http://114.115.236.206:8088/sync/finance/df' # url_baocun = 'http://114.115.236.206:8088/sync/finance/df'
for nnn in range(0, 3): for nnn in range(0, 3):
try: try:
res_baocun = requests.post(url_baocun, data=data) res_baocun = requests.post(url_baocun, data=data)
#将采集到的股票代码和日期进行记录用来标记是否采集过
com_date_list=[]
com_date_list.append(com_date)
self.add_date(com_code,com_date)
self.logger.info(res_baocun.text) self.logger.info(res_baocun.text)
break break
except: except:
...@@ -309,7 +352,7 @@ class SinaUsstock(object): ...@@ -309,7 +352,7 @@ class SinaUsstock(object):
if __name__ == '__main__': if __name__ == '__main__':
sinaUsstock=SinaUsstock() sinaUsstock=SinaUsstock()
# securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code') # securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code')
securitiescode= sinaUsstock.getCodeFromRedis() # securitiescode= sinaUsstock.getCodeFromRedis()
securitiescode='AAPL' securitiescode='AAPL'
try: try:
sinaUsstock.get_content2(securitiescode) sinaUsstock.get_content2(securitiescode)
......
""" """
...@@ -176,7 +176,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType) ...@@ -176,7 +176,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType)
dic_info_zcfzb = { dic_info_zcfzb = {
"name": info_name, "name": info_name,
'enName': info_name_en, 'enName': info_name_en,
"value": info_data "value": info_data,
"unit": "元"
} }
list_zcfzb.append(dic_info_zcfzb) list_zcfzb.append(dic_info_zcfzb)
...@@ -202,7 +203,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType) ...@@ -202,7 +203,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType)
dic_info_lrb = { dic_info_lrb = {
"name": info_name, "name": info_name,
'enName': info_name_en, 'enName': info_name_en,
"value": info_data "value": info_data,
"unit": "元"
} }
list_lrb.append(dic_info_lrb) list_lrb.append(dic_info_lrb)
...@@ -228,7 +230,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType) ...@@ -228,7 +230,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType)
dic_info_xjllb = { dic_info_xjllb = {
"name": info_name, "name": info_name,
'enName': info_name_en, 'enName': info_name_en,
"value": info_data "value": info_data,
"unit": "元"
} }
list_xjllb.append(dic_info_xjllb) list_xjllb.append(dic_info_xjllb)
...@@ -356,7 +359,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType) ...@@ -356,7 +359,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType)
dic_info_zcfzb = { dic_info_zcfzb = {
"name": info_name, "name": info_name,
'enName': info_name_en, 'enName': info_name_en,
"value": info_data "value": info_data,
"unit": '元'
} }
list_zcfzb.append(dic_info_zcfzb) list_zcfzb.append(dic_info_zcfzb)
...@@ -382,7 +386,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType) ...@@ -382,7 +386,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType)
dic_info_lrb = { dic_info_lrb = {
"name": info_name, "name": info_name,
'enName': info_name_en, 'enName': info_name_en,
"value": info_data "value": info_data,
'unit': '元'
} }
list_lrb.append(dic_info_lrb) list_lrb.append(dic_info_lrb)
...@@ -408,7 +413,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType) ...@@ -408,7 +413,8 @@ def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType)
dic_info_xjllb = { dic_info_xjllb = {
"name": info_name, "name": info_name,
'enName': info_name_en, 'enName': info_name_en,
"value": info_data "value": info_data,
'unit':'元'
} }
list_xjllb.append(dic_info_xjllb) list_xjllb.append(dic_info_xjllb)
......
...@@ -8,10 +8,8 @@ import pymysql ...@@ -8,10 +8,8 @@ import pymysql
import redis import redis
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages import urllib3 from requests.packages import urllib3
from retry import retry from retry import retry
from base import BaseCore from base import BaseCore
urllib3.disable_warnings() urllib3.disable_warnings()
...@@ -20,10 +18,7 @@ log = baseCore.getLogger() ...@@ -20,10 +18,7 @@ log = baseCore.getLogger()
cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji', cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
cursor = cnx.cursor() cursor = cnx.cursor()
URL = 'https://www.nasdaq.com/' r = baseCore.r
session = requests.session()
session.mount('https://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
session.mount('http://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
headers = { headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
} }
...@@ -86,7 +81,7 @@ def getUnit(gpdm): ...@@ -86,7 +81,7 @@ def getUnit(gpdm):
req.encoding = req.apparent_encoding req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml') soup = BeautifulSoup(req.text, 'lxml')
unit = soup.find('div', class_='financials__note').text.split(' ')[1].lstrip().strip() unit = soup.find('div', class_='financials__note').text.split(' ')[1].lstrip().strip()
unit = f'(千){unit}' unit = f'{unit}(千)'
req.close() req.close()
return unit return unit
...@@ -104,9 +99,11 @@ def getlist(table, tableName): ...@@ -104,9 +99,11 @@ def getlist(table, tableName):
value = re.sub(r"[^\d+-]", "", value) value = re.sub(r"[^\d+-]", "", value)
else: else:
value = '-' value = '-'
date = years[f'value{i}'].split('/')[2] + '-' + years[f'value{i}'].split('/')[0] + '-' + \ date_ = years[f'value{i}']
years[f'value{i}'].split('/')[1] if date_:
list.append({f'{tableName}': name, 'value': value, 'date': date, }) date = date_.split('/')[2] + '-' + date_.split('/')[0] + '-' + \
date_.split('/')[1]
list.append({f'{tableName}': name, 'value': value, 'date': date, })
return list return list
...@@ -136,13 +133,12 @@ def reviseData(lists, unit, tableName): ...@@ -136,13 +133,12 @@ def reviseData(lists, unit, tableName):
# 获取年度财务数据 # 获取年度财务数据
def getYear(start_time, session, social_code, gpdm): def getYear(start_time, social_code, gpdm):
ynFirst = check_code(social_code) ynFirst = check_code(social_code)
date_list = [] date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=1' url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=1'
try: try:
req = session.get(url, headers=headers, verify=False) req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data = req.json()['data'] data = req.json()['data']
if data: if data:
unit = getUnit(gpdm) unit = getUnit(gpdm)
...@@ -162,6 +158,7 @@ def getYear(start_time, session, social_code, gpdm): ...@@ -162,6 +158,7 @@ def getYear(start_time, session, social_code, gpdm):
# 判断该报告期是否已采过 # 判断该报告期是否已采过
panduan = check_date(social_code, date + '-year') panduan = check_date(social_code, date + '-year')
if panduan: if panduan:
log.info(f'{social_code}=={gpdm}=={date}年度数据采集过')
continue continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll') xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz') zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
...@@ -177,13 +174,15 @@ def getYear(start_time, session, social_code, gpdm): ...@@ -177,13 +174,15 @@ def getYear(start_time, session, social_code, gpdm):
"ynFirst": ynFirst, "ynFirst": ynFirst,
} }
sendData(start_time, social_code, gpdm, dic_info) sendData(start_time, social_code, gpdm, dic_info)
log.info(f'{social_code}=={gpdm}=={date}年度财务数据采集成功')
date_list.append(date + '-year') date_list.append(date + '-year')
else: else:
log.error(f'找不到{social_code}=={gpdm}年度财务数据') log.error(f'找不到{social_code}=={gpdm}年度财务数据')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据')
except: except Exception as e:
r.rpush('FinanceFromNasdaq:nasdaqfinance_socialCode', social_code)
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败')
...@@ -192,13 +191,12 @@ def getYear(start_time, session, social_code, gpdm): ...@@ -192,13 +191,12 @@ def getYear(start_time, session, social_code, gpdm):
# 获取季度财务数据 需要判断日期是否取与年度数据日期重合,重合需要修改类型为dateFlag字段为year # 获取季度财务数据 需要判断日期是否取与年度数据日期重合,重合需要修改类型为dateFlag字段为year
def getQuarter(start_time, session, social_code, gpdm): def getQuarter(start_time, social_code, gpdm):
ynFirst = check_code(social_code) ynFirst = check_code(social_code)
date_list = [] date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=2' url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=2'
try: try:
req = session.get(url, headers=headers, verify=False) req = requests.get(url, headers=headers, verify=False, timeout=60)
req.encoding = req.apparent_encoding
data = req.json()['data'] data = req.json()['data']
if data: if data:
unit = getUnit(gpdm) unit = getUnit(gpdm)
...@@ -217,6 +215,7 @@ def getQuarter(start_time, session, social_code, gpdm): ...@@ -217,6 +215,7 @@ def getQuarter(start_time, session, social_code, gpdm):
# 判断该报告期是否已采过 # 判断该报告期是否已采过
panduan = check_date(social_code, date + '-quarter') panduan = check_date(social_code, date + '-quarter')
if panduan: if panduan:
log.info(f'{social_code}=={gpdm}=={date}季度数据采集过')
continue continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll') xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz') zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
...@@ -236,13 +235,16 @@ def getQuarter(start_time, session, social_code, gpdm): ...@@ -236,13 +235,16 @@ def getQuarter(start_time, session, social_code, gpdm):
if panduan_flag: if panduan_flag:
dic_info['dateFlag'] = 'year' dic_info['dateFlag'] = 'year'
sendData(start_time, social_code, gpdm, dic_info) sendData(start_time, social_code, gpdm, dic_info)
log.info(f'{social_code}=={gpdm}=={date}季度财务数据采集成功')
date_list.append(date + '-quarter') date_list.append(date + '-quarter')
else: else:
log.error(f'{social_code}=={gpdm}无季度财务数据')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据')
except: except Exception as e:
r.rpush('FinanceFromNasdaq:nasdaqfinance_socialCode', social_code)
log.error(f'{social_code}=={gpdm}===季度财务数据访问失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败')
...@@ -250,36 +252,55 @@ def getQuarter(start_time, session, social_code, gpdm): ...@@ -250,36 +252,55 @@ def getQuarter(start_time, session, social_code, gpdm):
return date_list return date_list
# 信用代码放入redis中
def FinanceFromNasdaq():
sql = "select xydm from mgzqyjwyh_list where state=2 and exchange='Nasdaq;"
cursor.execute(sql)
finance = cursor.fetchall()
finance_list = [item[0] for item in finance]
for item in finance_list:
r.rpush('FinanceFromNasdaq:nasdaqfinance_socialCode', item)
print('redis放入成功')
def getInfomation(social_code):
sql = f"select * from mgzqyjwyh_list where state=2 and xydm='{social_code}';"
cursor.execute(sql)
data = cursor.fetchone()
return data
def doJob(): def doJob():
# while True: while True:
# social_code = baseCore.redicPullData('') social_code = baseCore.redicPullData('FinanceFromNasdaq:nasdaqfinance_socialCode')
# datas_enterprise = baseCore.getInfomation(social_code) if not social_code or social_code == None:
session.get(URL, headers=headers) log.info('============已没有数据============等待===============')
# sql = "select * from mgzqyjwyh_list where state=2 and exchange='Nasdaq';" time.sleep(600)
# cursor.execute(sql) continue
# datas_enterprise = cursor.fetchall() data_enterprise = getInfomation(social_code)
# for data_enterprise in datas_enterprise: start_time = time.time()
start_time = time.time() gpdm = data_enterprise[3]
# gpdm = data_enterprise[3] social_code = data_enterprise[6]
# social_code = data_enterprise[6] # print(gpdm,social_code)
social_code = 'ZD0CN0012309000172' # 采集年度数据
gpdm = 'NTES' date_list_year = getYear(start_time, social_code, gpdm)
# 采集年度数据 # 保存年度数据到redis
date_list_year = getYear(start_time, session, social_code, gpdm) add_date(social_code, date_list_year)
# 保存年度数据到redis # 采集季度数据
add_date(social_code, date_list_year) date_list_quarter = getQuarter(start_time, social_code, gpdm)
# 采集季度数据 # 保存季度数据到redis
date_list_quarter = getQuarter(start_time, session, social_code, gpdm) add_date(social_code, date_list_quarter)
# 保存季度数据到redis timeCost = baseCore.getTimeCost(start_time, time.time())
add_date(social_code, date_list_quarter) state = 1
timeCost = baseCore.getTimeCost(start_time, time.time()) baseCore.recordLog(social_code, taskType, state, timeCost, '', '')
state = 1 log.info(f'{social_code}=={gpdm}==耗时{timeCost}')
baseCore.recordLog(social_code, taskType, state, timeCost, '', '') time.sleep(2)
log.info(f'{social_code}=={gpdm}==耗时{timeCost}')
# break
cursor.close()
cnx.close()
if __name__ == '__main__': if __name__ == '__main__':
# 财务数据采集
doJob() doJob()
# 企业股票代码放入redis
# FinanceFromNasdaq()
cursor.close()
cnx.close()
# -*- coding: utf-8 -*-
import time
from urllib.parse import quote
import requests
import urllib3
from BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
# headers = {
# 'Host': 'xcx.qcc.com',
# 'Connection': 'keep-alive',
# 'Qcc-Platform': 'mp-weixin',
# 'Qcc-Timestamp': '',
# 'Qcc-Version': '1.0.0',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
# 'content-type': 'application/json',
# 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
# 'Accept-Encoding': 'gzip, deflate, br,'
# }
headers = {
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive',
'x-request-device-type': 'Android',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8391',
'Content-Type': 'application/json',
'Qcc-Version': '1.0.0',
'authMini': 'Bearer f51dae1a2fcb109fa9ec58bd4a85e5c5',
'xweb_xhr': '1',
'xcx-version': '2023.09.27',
'Qcc-Platform': 'mp-weixin',
'Qcc-CurrentPage': '/company-subpackages/business/index',
'Qcc-Timestamp': '1696661787803',
'Qcc-RefPage': '/company-subpackages/detail/index',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/307/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh'
}
# 通过企业名称或信用代码获取企查查id
def find_id_by_name(start,token,name):
urllib3.disable_warnings()
qcc_key = name
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = f"https://xcx.qcc.com/mp-weixin/forwardApp/v3/base/advancedSearch?token={token}&t={t}&pageIndex=1&needGroup=yes&insuredCntStart=&insuredCntEnd=&startDateBegin=&startDateEnd=&registCapiBegin=&registCapiEnd=&countyCode=&province=&sortField=&isSortAsc=&searchKey={quote(qcc_key)}&searchIndex=default&industryV3="
for lll in range(1, 6):
try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break
except Exception as e:
print(f'{e}-------------重试')
time.sleep(5)
continue
time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '':
KeyNo = 'null'
else:
KeyNo = 'null'
except:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}')
return KeyNo
log.info("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo
\ No newline at end of file
# -*- coding: utf-8 -*-
import time
from urllib.parse import quote
import requests
import urllib3
from BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
# headers = {
# 'Host': 'xcx.qcc.com',
# 'Connection': 'keep-alive',
# 'Qcc-Platform': 'mp-weixin',
# 'Qcc-Timestamp': '',
# 'Qcc-Version': '1.0.0',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
# 'content-type': 'application/json',
# 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
# 'Accept-Encoding': 'gzip, deflate, br,'
# }
headers = {
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive',
'x-request-device-type': 'Android',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8391',
'Content-Type': 'application/json',
'Qcc-Version': '1.0.0',
'authMini': 'Bearer f51dae1a2fcb109fa9ec58bd4a85e5c5',
'xweb_xhr': '1',
'xcx-version': '2023.09.27',
'Qcc-Platform': 'mp-weixin',
'Qcc-CurrentPage': '/company-subpackages/business/index',
'Qcc-Timestamp': '1696661787803',
'Qcc-RefPage': '/company-subpackages/detail/index',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/307/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh'
}
# 通过企业名称或信用代码获取企查查id
def find_id_by_name(start,token,name):
urllib3.disable_warnings()
qcc_key = name
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = f"https://xcx.qcc.com/mp-weixin/forwardApp/v3/base/advancedSearch?token={token}&t={t}&pageIndex=1&needGroup=yes&insuredCntStart=&insuredCntEnd=&startDateBegin=&startDateEnd=&registCapiBegin=&registCapiEnd=&countyCode=&province=&sortField=&isSortAsc=&searchKey={quote(qcc_key)}&searchIndex=default&industryV3="
for lll in range(1, 6):
try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break
except Exception as e:
print(f'{e}-------------重试')
time.sleep(5)
continue
time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '':
KeyNo = 'null'
else:
KeyNo = 'null'
except:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}')
return KeyNo
log.info("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo
\ No newline at end of file
"""
新浪财经美股企业动态
"""
import json
import time
import jieba
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from base.smart import smart_extractor
from base.BaseCore import BaseCore
# 初始化,设置中文分词
jieba.cut("必须加载jieba")
smart = smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
taskType = '新浪财经/天眼查'
# 获取企业信息
def getinfomation(social_code):
selectSql = f"select * from mgzqjywyh_list where state = '2' and xydm='{social_code}' "
cursor.execute(selectSql)
data = cursor.fetchone()
cnx.commit()
data = list(data)
cursor.close()
cnx.close()
return data
# 获取响应页面
@retry(tries=3, delay=1)
def getrequests(url):
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
return soup
# 解析内容
def getDic(social_code, li):
start_time = time.time()
title = li.find('a').text
href = li.find('a').get('href')
tag_at = li.find('span', class_='xb_list_r').text
author = tag_at.split('|')[0].lstrip().strip()
pub_time = tag_at.split('|')[1].lstrip().strip()
pub_time = pub_time.split(' ')[0].replace('年', '-').replace('月', '-').replace('日', '')
if 'http' not in href:
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try:
# 带标签正文
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
except:
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return
dic_news = {
'attachmentIds': '',
'author': author,
'content': content,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '新浪财经',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': pub_time[:4]
}
# print(dic_news)
try:
sendKafka(dic_news, start_time)
log.info(f'Kafka发送成功')
try:
insertMysql(social_code, href)
log.info(f'数据库保存成功')
except:
log.error(f'{href}===数据入库失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败')
except:
log.error(f'{href}===发送Kafka失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
# 数据发送至Kafka
@retry(tries=3, delay=1)
def sendKafka(dic_news, start_time):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(dic_news['socialCreditCode'], taskType, state, takeTime, dic_news['sourceAddress'], '')
# 数据保存入库,用于判重
@retry(tries=3, delay=1)
def insertMysql(social_code, link):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'新浪财经',
'2',
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 判断动态是否采集过
@retry(tries=3, delay=1)
def selectUrl(url, social_code):
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (url, social_code))
selects = cursor.fetchone()
return selects
def doJob():
# while True:
# social_code = ''
# # 从redis中获取企业信用代码
# try:
# data = getinfomation(social_code)
# com_code = data[6]
com_code = 'AAPL'
social_code = 'ZZSN22080900000004'
log.info(f'{social_code}==={com_code}===开始采集')
start_time = time.time()
pageIndex = 1
while True:
# 拼接链接
# url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex=1&symbol=AAPL&type=1'
url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex={pageIndex}&symbol={com_code}&type=1'
soup_home = getrequests(url)
li_list = soup_home.select('body > div > div.xb_news > ul > li')
# 有可能第一次获取的li标签列表为空
for i in range(5):
if len(li_list) == 0:
li_list = soup_home.select('body > div > div.xb_news > ul > li')
else:
break
for li in li_list:
title = li.find('a').text
if title == '':
continue
href = li.find('a').get('href')
selects = selectUrl(href, social_code)
if selects:
log.info(f'{url}==已采集过')
else:
getDic(social_code, li)
break
break
# # 如果采集到已采集过动态,证明最新发布动态已经全部采集过
# 增量使用
# if selects:
# break
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
for i in range(5):
if len(next) == 0:
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
else:
break
if len(next) == 2:
break
pageIndex += 1
time.sleep(2)
log.info(f'{social_code}==={com_code}===企业整体耗时{baseCore.getTimeCost(start_time,time.time())}')
# except:
# log.info(f'==={social_code}=====获取企业信息失败====')
# #重新塞入redis
# baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code)
# state = 0
# takeTime = baseCore.getTimeCost(start, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
# time.sleep(5)
if __name__ == "__main__":
doJob()
...@@ -33,7 +33,7 @@ def updatewxLink(link,info_source_code,state): ...@@ -33,7 +33,7 @@ def updatewxLink(link,info_source_code,state):
def getjsonInfo(): def getjsonInfo():
#从数据库中获取信息 一条 #从数据库中获取信息 一条
select_sql = "select * from wx_link where state=100 order by id asc limit 1" select_sql = "select * from wx_link where state=0 order by id asc limit 1"
cursor_.execute(select_sql) cursor_.execute(select_sql)
row = cursor_.fetchone() row = cursor_.fetchone()
cnx_.commit() cnx_.commit()
......
# created by virtualenv automatically
*
import gc
from flask import Flask, render_template, request, current_app
import configparser
from controller.Main import Main # 导入全部蓝图变量
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
from dao.Conn import ConnMySql
import sys
import io
# 清除登录状态
def clearLoginStateIn24H():
conn = ConnMySql()
conn.userClearLoginStateIn24H()
print("清除登录状态-" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
app = Flask(__name__) # 初始化Flask对象
app.register_blueprint(Main) # 将所有蓝图对象注册到app这个flask对象内
# 上传文件最大16M字节
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# App配置信息,键=段名+键名,如:db.port=3306
cfg = configparser.ConfigParser()
cfg.optionxform = str # 保持配置文件中键的大小写
cfg.read("static/conf/sys.ini", encoding='utf-8')
sections = cfg.sections()
for section in sections:
items = cfg.items(section)
for key, val in items:
app.config[section + '.' + key] = val
# 个别取值进行特殊处理
app.config['db.port'] = int(app.config['db.port'])
if app.config['sys.useProxy'] == "0":
app.config['sys.useProxy'] = False
else:
app.config['sys.useProxy'] = True
app.config['sys.proxyid'] = 0 #当前使用的代理id
app.config['sys.userid'] = 0 #当前使用的账号id
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if __name__ == '__main__':
# webbrowser.open("0.0.0.0:5000")
app.run(host='0.0.0.0', port=5201, debug=True) # 启动入口
# 启动定时任务,定时清除异常登录状态,每半小时一次
# sched = BlockingScheduler()
# sched.add_job(clearLoginStateIn24H, 'interval', seconds=1800, id='task-clearLoginStateIn24H')
# sched.start()
import gc
from flask import Blueprint, request, current_app, make_response, send_file # 导入蓝图
import datetime
import re
import os
import logging
import sys
import io
import tempfile
import openpyxl
import string
import json
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium import webdriver
from selenium.webdriver.common.by import By
from util import UtilDate
from service.Service02 import Service02
Main = Blueprint('Main', __name__) # 初始化一个蓝图,而不是Flask对象
# 接受请求,读取请求体中的JSON参数,根据参数进行抓取动作
# {"from":"1900-01-01","last":最近x天数, "orgs":["单位1全称","单位2全称","单位3全称",...]}
@Main.route('/Main/getData', methods=["POST"])
def getData():
print("POST /Main/getData")
paras = request.get_json(force=True)
dateFrom = paras['from']
lastDays = paras['last']
orgs = paras['orgs']
if dateFrom == "":
if lastDays == "":
lastDays = 0
else:
lastDays = -(int(lastDays) - 1)
dateFrom = UtilDate.dateAdd("", "d", lastDays)
service02 = Service02()
return service02.getData(dateFrom, orgs) #"https://wenshu.court.gov.cn/website/wenshu/181029CR4M5A62CH/index.html"
class ProxyDao():
def t(self):
pass
# 基本信息
from util import UtilDate
from util import UtilNumber
class BaseInfo:
info_title: str # 标题
key_word: str # 关键词
info_bianhao: str # 案号
info_address: str # 管辖法院
info_time: str # 发布日期 #yyyy-mm-dd
info_id: str # 案件ID
info_yuanyou: str # 裁判理由
info_content: str # 正文内容
# 判断本条信息日期是否在指定日期(含)之后
def isAfter(self, sDate: str) -> bool:
if sDate == "":
return False
else:
if self.info_time >= sDate:
return True
else:
return False
def toString(self):
return self.info_title + "\t" + self.key_word + "\t" + self.info_bianhao + "\t" + self.info_address + "\t" + self.info_time + "\t" + self.info_id
home = C:\Program Files\Python
implementation = CPython
version_info = 3.8.0.final.0
virtualenv = 20.13.0
include-system-site-packages = true
base-prefix = C:\Program Files\Python
base-exec-prefix = C:\Program Files\Python
base-executable = C:\Program Files\Python\python.exe
#系统配置
[sys]
#文字识别Url,用于识别裁判文书网的验证码
ocrUrl=http://114.116.49.86:8013/wzsb_app?withCrLf=false
#登录模式,0-无需登录,1-账号登录(需要口令、短信、验证码相应的选择器不能为空),2-cookie登录
loginMode=1
#是否使用代理,0-不用,1-使用,需登录的一般不适用代理
useProxy=1
#验证码识别,0-不识别,1-识别,暂采用固定的方法识别验证码,后续扩展为不同的识别模式
verifiCode=0
#登录Url ?open=login
loginUrl=https://wenshu.court.gov.cn/website/wenshu/181010CARHS5BS3C/index.html
#正常Url,登录后可能会自动跳转到正常Url
mainUrl=https://wenshu.court.gov.cn
#登录-用户
loginUser=#root > div > form > div > div:nth-child(1) > div > div > div > input
#登录-口令
loginPasswd=#root > div > form > div > div:nth-child(2) > div > div > div > input
#裁判文书网的图形验证码在单独的页面,输入正确后返回到登录界面
#登录-图形验证码输入框,不为空时则需要识别验证码
loginCaptchaInput=body > div > div.card-body > div > form > div.captcha > input
#登录-图形验证码图片
loginCaptchaImage=#Image1
#登录-图形验证码确认按钮
loginCaptchaButton=body > div > div.card-body > div > form > div.warnbtn > input
#登录-短信验证码,可能和图形验证码同时需要,暂未处理
loginSMSCode=
#主界面登录按钮
loginButton=#loginLi > a
#登录界面确认登录按钮
loginOk=#root > div > form > div > div.login-button-container > span
#数据库配置
[db]
host=114.115.159.144
port=3306
user=caiji
passwd=zzsn9988
db=caiji
charset=utf8
#css选择器配置
[css]
#搜索-文本框
searchInput=#_view_1540966814000 > div > div.search-wrapper.clearfix > div.search-middle > input
#搜索-按钮
searchButton=#_view_1540966814000 > div > div.search-wrapper.clearfix > div.search-rightBtn.search-click
#列表-日期倒排按钮
listDateSort=#_view_1545184311000 > div.LM_tool.clearfix > div:nth-child(2) > a
#列表-案件数量
listCount=#_view_1545184311000 > div.LM_con.clearfix > div.fr.con_right > span
#列表-案件名称
listTitle=#_view_1545184311000 > div:nth-child(?) > div.list_title.clearfix > h4 > a
#列表-编号
listBianhao=#_view_1545184311000 > div:nth-child(?) > div.list_subtitle > span.ah
#列表-法院
listAddress=#_view_1545184311000 > div:nth-child(?) > div.list_subtitle > span.slfyName
#列表-审结日期
listTime=#_view_1545184311000 > div:nth-child(?) > div.list_subtitle > span.cprq
#列表-案由
listYuanyou=#_view_1545184311000 > div:nth-child(?) > div.list_reason > p
#下一页按钮
listNextPage=#_view_1545184311000 > div.left_7_3 > a:last-child
#正文-链接,一般和title相同
contLink=#_view_1545184311000 > div:nth-child(?) > div.list_title.clearfix > h4 > a
#正文-正文
contContent=#_view_1541573883000 > div > div.PDF_box > div.PDF_pox
# 验证码识别,暂只处理裁判文书网的验证码
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.webdriver import WebDriver
import requests
from flask import current_app
from pathlib import Path
import tempfile
import uuid
import hashlib
import os
import json
# selecter: 验证码图片css选择器
def getCaptchaMode1(browser: WebDriver,selecter: str):
ret = ""
# 通过requests发送一个get请求到图片地址,返回的响应就是图片内容
out_path = "./Temp_file"
try:
Path(out_path).mkdir(parents=True, exist_ok=True)
# 将获取到的图片二进制流写入本地文件
path_name = os.path.join(out_path, str(uuid.uuid4())) + ".png"
print(path_name)
# 保存验证码图片
img = browser.find_element(By.CSS_SELECTOR, selecter)
img.screenshot(path_name)
# #url方式下载
# r = requests.get(imgUrl)
# with open(path_name, 'wb') as f:
# # 对于图片类型的通过r.content方式访问响应内容,将响应内容写入baidu.png中
# f.write(r.content)
ocrUrl = current_app.config['sys.ocrUrl']
# 调用文字识别服务
file = open(path_name, "rb")
response = requests.post(ocrUrl, files={"multiRequest": file})
file.close()
os.remove(path_name)
# 返回:{"code":200,"logs":null,"message":"success","resultData":"2rVK"}
oRet = json.loads(response.text)
ret = oRet["resultData"]
#os.remove(path_name)
print(ret)
except Exception as err:
print('getCaptchaMode1 error:', err)
return ret
from datetime import datetime,timedelta
from dateutil.relativedelta import relativedelta
#将yyyy月m月d日格式的日期转为yyyy-mm-dd格式的日期
def convertDate(sDate:str):
sDate = sDate.replace("年","-")
sDate = sDate.replace("月", "-")
sDate = sDate.replace("日", "")
date_obj = datetime.strptime(sDate, '%Y-%m-%d')
sDate = date_obj.strftime('%Y-%m-%d')
return sDate
#日期加减偏置,参数ymd为单位,y=年,m=月,d=日
def dateAdd(sDate:str,ymd:str="d",diff:int=1):
if sDate=="":
sDate = datetime.now()
sDate = sDate.strftime('%Y-%m-%d')
date_obj = datetime.strptime(sDate, '%Y-%m-%d')
if ymd=="y":
if diff > 0:
date_obj = date_obj+relativedelta(years=diff)
else:
diff=-diff
date_obj = date_obj - relativedelta(years=diff)
elif ymd=="m":
if diff>0:
date_obj = date_obj + relativedelta(months=diff)
else:
diff=-diff
date_obj = date_obj - relativedelta(months=diff)
elif ymd=="d":
date_obj = date_obj + timedelta(days=diff)
else:
pass
sDate = date_obj.strftime('%Y-%m-%d')
return sDate
\ No newline at end of file
#数值处理类
#将字符串的金额转换为数值型金额,字符串金额可能包含万元,人民币等
def convertMoney(sMoney:str):
sMoney = sMoney.replace("万", "")
sMoney = sMoney.replace("亿", "")
sMoney = sMoney.replace("人民币", "")
sMoney = sMoney.replace("元", "")
return float(sMoney)
from selenium.webdriver.chrome.webdriver import WebDriver
from seleniumwire import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
#IP代理池
class UtilProxy:
id:int
ip:str
port:str
name:str
password:str
#切换IP代理
def alterIP(self,browser:WebDriver):
pass
# 账号信息
from util import UtilDate
from util import UtilNumber
class LoginInfo:
id: int # 标题
user_group: str
user_name: str
user_passwd: str
# 代理IP信息
from util import UtilDate
from util import UtilNumber
class ProxyInfo:
id: int # 标题
ip: str
port: str
user_name: str
user_passwd: str
import pandas as pd
import pandas as pd
import glob
# 查找当前目录及其子目录下所有以.txt结尾的文件
csv_files = glob.glob(r"D:\机械项目研报\机械项目研报*.xlsx", recursive=True)
# 创建一个空的DataFrame用于存储合并后的数据
merged_data = pd.DataFrame()
# 逐个读取CSV文件并合并到DataFrame中
for file in csv_files:
data = pd.read_excel(file,dtype=str)
# 去掉最后一列
# data = data.iloc[:, :-1]
dad=pd.DataFrame(data,dtype=str)
merged_data = merged_data.append(dad, ignore_index=True)
sorted_df = merged_data.sort_values('industry')
grouped = merged_data.groupby('industry')
# 将合并后的数据保存到新的CSV文件中
# merged_data.to_csv(r"D:\hg\tmp\11.csv", encoding='gbk', index=False, quoting=1, quotechar='"', escapechar='\\')
# merged_data.to_excel(r"D:\机械项目研报\机械项目研报汇总.xlsx", index=False, engine='openpyxl')
with pd.ExcelWriter(r'D:\机械项目研报\机械项目研报汇总2.xlsx') as writer:
for group_name, group_df in grouped:
group_df.to_excel(writer, sheet_name=group_name, index=False)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论