提交 63cac106 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

...@@ -5,22 +5,18 @@ import socket ...@@ -5,22 +5,18 @@ import socket
import sys import sys
import time import time
import fitz
import logbook import logbook
import logbook.more import logbook.more
import pandas as pd import pandas as pd
import requests import requests
import zhconv import zhconv
import pymysql
import redis import redis
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
from openpyxl import Workbook
import langid import langid
#创建连接池 #创建连接池
import pymysql import pymysql
from pymysql import connections
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
# import sys # import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client') # sys.path.append('D://zzsn_spider//base//fdfs_client')
...@@ -28,6 +24,15 @@ from DBUtils.PooledDB import PooledDB ...@@ -28,6 +24,15 @@ from DBUtils.PooledDB import PooledDB
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf') tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
from obs import ObsClient
import fitz
from urllib.parse import unquote
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
...@@ -659,12 +664,10 @@ class BaseCore: ...@@ -659,12 +664,10 @@ class BaseCore:
create_time = retData['create_time'] create_time = retData['create_time']
order_by = num order_by = num
selects = self.secrchATT(item_id,year,type_id) selects = self.secrchATT(item_id,year,type_id)
# sel_sql = '''select id,item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
# self.cursor.execute(sel_sql, (item_id, year,type_id))
# selects = self.cursor.fetchone()
if selects: if selects:
self.getLogger().info(f'com_name:{com_name}已存在') self.getLogger().info(f'com_name:{com_name}--{year}已存在')
id = selects[0] id = ''
return id return id
else: else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
...@@ -695,6 +698,80 @@ class BaseCore: ...@@ -695,6 +698,80 @@ class BaseCore:
log = self.getLogger() log = self.getLogger()
log.info('======保存企业CIK失败=====') log.info('======保存企业CIK失败=====')
#上传至obs华为云服务器,并解析破地方的内容和页数
# 获取文件大小
def convert_size(self,size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def obsexist(self,file_path):
# # 文件路径
# file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc'
# 检查文件是否存在
response = obsClient.getObjectMetadata('zzsn', file_path)
if response.status >= 300:
self.getLogger().info('=====文件不存在obs=====')
else:
self.getLogger().info(f'=====文件存在obs========{file_path}')
def uptoOBS(self,pdf_url, name_pdf,type_id, social_code,pathType,taskType,start_time):
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
name = name_pdf + '.pdf'
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = self.getTimeCost(start_time, time.time())
self.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
......
...@@ -475,7 +475,14 @@ def kegaishifan(): ...@@ -475,7 +475,14 @@ def kegaishifan():
#双百企业 #双百企业
def shuangbaiqiye(): def shuangbaiqiye():
pass cnx, cursor = connectSql()
query = "SELECT CompanyName FROM Hundred"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('hundred:baseinfo', item)
#专精特新 #专精特新
def zhuangjingtexind(): def zhuangjingtexind():
...@@ -484,7 +491,8 @@ def zhuangjingtexind(): ...@@ -484,7 +491,8 @@ def zhuangjingtexind():
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
# danxiangguanjun() # danxiangguanjun()
kegaishifan() # kegaishifan()
shuangbaiqiye()
# NoticeEnterprise() # NoticeEnterprise()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
......
import json import json
import random
import requests, time, pymysql import requests, time, pymysql
import jieba import jieba
import sys import sys
...@@ -45,24 +47,21 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -45,24 +47,21 @@ def beinWork(tyc_code, social_code,start_time):
retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0} retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0}
t = time.time() t = time.time()
url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100' url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100'
for m in range(0, 3): try:
try: for m in range(0, 3):
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
response = requests.get(url=url, headers=headers, proxies=ip, verify=False) response = requests.get(url=url, headers=headers, proxies=ip, verify=False)
# time.sleep(random.randint(3, 5)) time.sleep(random.randint(3, 5))
break break
except Exception as e: if (response.status_code == 200):
pass pass
except Exception as e:
if (response.status_code == 200):
pass
else:
log.error(f"{tyc_code}-----获取总数接口失败") log.error(f"{tyc_code}-----获取总数接口失败")
e = '获取总数接口失败' error = '获取总数接口失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e) baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{error}----{e}')
return retData return retData
try: try:
json_1 = json.loads(response.content.decode('utf-8')) json_1 = json.loads(response.content.decode('utf-8'))
...@@ -177,7 +176,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -177,7 +176,7 @@ def beinWork(tyc_code, social_code,start_time):
pass pass
continue continue
try: try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,create_time) values(%s,%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
up_okCount = up_okCount + 1 up_okCount = up_okCount + 1
list_info = [ list_info = [
...@@ -185,6 +184,7 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -185,6 +184,7 @@ def beinWork(tyc_code, social_code,start_time):
link, link,
'天眼查', '天眼查',
'2', '2',
time_format
] ]
cursor_.execute(insert_sql, tuple(list_info)) cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit() cnx_.commit()
...@@ -214,10 +214,10 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -214,10 +214,10 @@ def beinWork(tyc_code, social_code,start_time):
} }
except Exception as e: except Exception as e:
log.info(f'传输失败:{social_code}----{link}') log.info(f'传输失败:{social_code}----{link}')
e = '数据库传输失败' error = '数据库传输失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e) baseCore.recordLog(social_code, taskType, state, takeTime, link, f'{error}----{e}')
continue continue
# print(dic_news) # print(dic_news)
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
......
import json import json
...@@ -21,6 +21,7 @@ tracker_conf = get_tracker_conf('./client.conf') ...@@ -21,6 +21,7 @@ tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
taskType = '企业年报/证监会' taskType = '企业年报/证监会'
pathType = 'ZJHAnnualReport/'
def RequestUrl(url, payload, item_id, start_time): def RequestUrl(url, payload, item_id, start_time):
# ip = get_proxy()[random.randint(0, 3)] # ip = get_proxy()[random.randint(0, 3)]
...@@ -43,26 +44,26 @@ def RequestUrl(url, payload, item_id, start_time): ...@@ -43,26 +44,26 @@ def RequestUrl(url, payload, item_id, start_time):
return soup return soup
def tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status, # def tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, create_time, page_size): # create_by, create_time, page_size):
#
sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=1''' # sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=1'''
cursor_.execute(sel_sql, (item_id, year)) # cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone() # selects = cursor_.fetchone()
if selects: # if selects:
print(f'{name_pdf},{year}已存在') # print(f'{name_pdf},{year}已存在')
#
else: # else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' # Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
#
values = ( # values = (
year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status, # year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, # create_by,
create_time, page_size) # create_time, page_size)
#
cursor_.execute(Upsql, values) # 插入 # cursor_.execute(Upsql, values) # 插入
cnx.commit() # 提交 # cnx.commit() # 提交
print("更新完成:{}".format(Upsql)) # print("更新完成:{}".format(Upsql))
# 采集信息 # 采集信息
def SpiderByZJH(url, payload, dic_info, num, start_time): def SpiderByZJH(url, payload, dic_info, num, start_time):
...@@ -121,19 +122,24 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -121,19 +122,24 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
cursor_.execute(sel_sql, (item_id, year)) cursor_.execute(sel_sql, (item_id, year))
selects = cursor_.fetchone() selects = cursor_.fetchone()
if selects: if selects:
print(f'com_name:{short_name}、{year}已存在') log.info(f'com_name:{short_name}、{year}已存在')
continue continue
else: else:
retData = baseCore.upLoadToServe(pdf_url, 1, social_code) retData = baseCore.uptoOBS(pdf_url,name_pdf, 1, social_code,pathType,taskType,start_time)
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
#插入数据库获取att_id #插入数据库获取att_id
num = num + 1 num = num + 1
att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num) att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num)
content = retData['content'] if att_id:
if retData['state']:
pass pass
else: else:
log.info(f'====pdf解析失败====')
return False return False
content = retData['content']
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
...@@ -169,7 +175,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -169,7 +175,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
'message': '操作成功', 'message': '操作成功',
'code': '200', 'code': '200',
} }
print(dic_result) log.info(dic_result)
return True return True
except Exception as e: except Exception as e:
dic_result = { dic_result = {
...@@ -181,7 +187,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -181,7 +187,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败') baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
print(dic_result) log.info(dic_result)
return False return False
else: else:
continue continue
...@@ -311,7 +317,8 @@ if __name__ == '__main__': ...@@ -311,7 +317,8 @@ if __name__ == '__main__':
time.sleep(20) time.sleep(20)
continue continue
dic_info = baseCore.getInfomation(social_code) dic_info = baseCore.getInfomation(social_code)
count = dic_info[15] count = dic_info[16]
log.info(f'====正在采集{social_code}=====')
# 沪市http://eid.csrc.gov.cn/101111/index.html 深市http://eid.csrc.gov.cn/101811/index.html 北交所http://eid.csrc.gov.cn/102611/index.html # 沪市http://eid.csrc.gov.cn/101111/index.html 深市http://eid.csrc.gov.cn/101811/index.html 北交所http://eid.csrc.gov.cn/102611/index.html
# url 变量 翻页 栏目 http://eid.csrc.gov.cn/101811/index_3_f.html # url 变量 翻页 栏目 http://eid.csrc.gov.cn/101811/index_3_f.html
url_parms = ['101111', '101811', '102611'] url_parms = ['101111', '101811', '102611']
...@@ -322,7 +329,7 @@ if __name__ == '__main__': ...@@ -322,7 +329,7 @@ if __name__ == '__main__':
dic_parms = getUrl(code, url_parms, Catagory2_parms) dic_parms = getUrl(code, url_parms, Catagory2_parms)
SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, num, start_time) SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, num, start_time)
end_time = time.time() end_time = time.time()
print(f'{dic_info[4]} ---- 该企业耗时 ---- {end_time - start_time}') log.info(f'{dic_info[4]} ---- 该企业耗时 ---- {end_time - start_time}')
count += 1 count += 1
runType = 'AnnualReportCount' runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -152,24 +152,23 @@ def spider_annual_report(dict_info,num): ...@@ -152,24 +152,23 @@ def spider_annual_report(dict_info,num):
cursor.execute(sel_sql, (social_code, int(year))) cursor.execute(sel_sql, (social_code, int(year)))
selects = cursor.fetchone() selects = cursor.fetchone()
if selects: if selects:
print(f'com_name:{com_name}、{year}已存在') log.info(f'com_name:{com_name}、{year}已存在')
continue continue
else: else:
page_size = 0 #上传文件至obs服务器
#上传文件至文件服务器 retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time)
retData = baseCore.upLoadToServe(pdf_url,1,social_code) if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1 num = num + 1
try: try:
att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num) att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num)
content = retData['content'] content = retData['content']
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
state = 1 state = 1
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '') baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '成功')
except: except:
exception = '数据库传输失败' exception = '数据库传输失败'
state = 0 state = 0
...@@ -236,6 +235,7 @@ def spider_annual_report(dict_info,num): ...@@ -236,6 +235,7 @@ def spider_annual_report(dict_info,num):
if __name__ == '__main__': if __name__ == '__main__':
num = 0 num = 0
taskType = '企业年报/雪球网' taskType = '企业年报/雪球网'
pathType = 'XQWAnnualReport/'
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
......
...@@ -14,6 +14,12 @@ def conn11(): ...@@ -14,6 +14,12 @@ def conn11():
cursor = conn.cursor() cursor = conn.cursor()
return conn,cursor return conn,cursor
def conn144():
conn = pymysql.Connect(host='114.115.159.144', port=3306, user='caiji', passwd='zzsn9988', db='caiji',
charset='utf8')
cursor = conn.cursor()
return conn,cursor
#企业公告 #企业公告
def shizhiCodeFromSql(): def shizhiCodeFromSql():
conn,cursor=conn11() conn,cursor=conn11()
...@@ -31,6 +37,7 @@ def shizhiCodeFromSql(): ...@@ -31,6 +37,7 @@ def shizhiCodeFromSql():
finally: finally:
cursor.close() cursor.close()
conn.close() conn.close()
#企业公告 #企业公告
def yahooCodeFromSql(): def yahooCodeFromSql():
conn,cursor=conn11() conn,cursor=conn11()
...@@ -49,6 +56,25 @@ def yahooCodeFromSql(): ...@@ -49,6 +56,25 @@ def yahooCodeFromSql():
cursor.close() cursor.close()
conn.close() conn.close()
#新浪纽交所股票对应的代码
def sinausstockCodeFromSql():
conn,cursor=conn144()
try:
gn_query = "select ticker from mgzqyjwyh_list where state=2 and exchange='NYSE'; "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('sinausstockCodeFromSql开始将股票代码放入redis=======')
for item in gn_social_list:
r.rpush('sina_usstock:securities_code', item)
print('sinausstockCodeFromSql将股票代码放入redis结束')
except Exception as e:
log.info("数据查询异常")
finally:
cursor.close()
conn.close()
def yahooCode_task(): def yahooCode_task():
# 实例化一个调度器 # 实例化一个调度器
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
...@@ -58,9 +84,12 @@ def yahooCode_task(): ...@@ -58,9 +84,12 @@ def yahooCode_task():
scheduler.add_job(yahooCodeFromSql, 'cron', day='*/3', hour=0, minute=0) scheduler.add_job(yahooCodeFromSql, 'cron', day='*/3', hour=0, minute=0)
# 每天执行一次 # 每天执行一次
scheduler.add_job(shizhiCodeFromSql, 'cron', hour=10,minute=0) scheduler.add_job(shizhiCodeFromSql, 'cron', hour=10,minute=0)
# 每天执行一次
scheduler.add_job(sinausstockCodeFromSql, 'cron', day='*/3', hour=0, minute=0)
try: try:
yahooCodeFromSql() # 定时开始前执行一次 # yahooCodeFromSql() # 定时开始前执行一次
shizhiCodeFromSql() # 定时开始前执行一次 # shizhiCodeFromSql() # 定时开始前执行一次
sinausstockCodeFromSql() # 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -373,6 +373,28 @@ class YahooCaiwu(object): ...@@ -373,6 +373,28 @@ class YahooCaiwu(object):
currency='' currency=''
return currency return currency
#对比指标计算
def calculateIndexReq(self):
get_url = 'http://114.115.236.206:8088/sync/calculateIndex'
try:
params={
'type':2
}
resp = requests.get(get_url,params=params)
print(resp.text)
text=json.loads(resp.text)
codee=text['code']
while codee==-200:
time.sleep(600)
resp = requests.get(get_url)
print(resp.text)
text=json.loads(resp.text)
codee=text['code']
if codee==-200:
break
print('调用接口成功!!')
except:
print('调用失败!')
if __name__ == '__main__': if __name__ == '__main__':
# parse_excel() # parse_excel()
#get_content1() #get_content1()
...@@ -383,8 +405,11 @@ if __name__ == '__main__': ...@@ -383,8 +405,11 @@ if __name__ == '__main__':
securitiescode=yahoo.getCodeFromRedis() securitiescode=yahoo.getCodeFromRedis()
yahoo.get_content2(securitiescode) yahoo.get_content2(securitiescode)
except Exception as e: except Exception as e:
print('没有数据暂停5分钟')
yahoo.calculateIndexReq()
if securitiescode: if securitiescode:
yahoo.r.rpush('NoticeEnterprise:securities_code',securitiescode) yahoo.r.rpush('NoticeEnterprise:securities_code',securitiescode)
else: else:
time.sleep(300) time.sleep(300)
print('没有数据暂停5分钟')
import configparser import configparser
...@@ -20,6 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -20,6 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from operator import itemgetter from operator import itemgetter
from itertools import groupby from itertools import groupby
import datetime import datetime
from decimal import Decimal
class SinaUsstock(object): class SinaUsstock(object):
...@@ -54,13 +55,19 @@ class SinaUsstock(object): ...@@ -54,13 +55,19 @@ class SinaUsstock(object):
seriesValue=tddoc.find('td').text().split(' ') seriesValue=tddoc.find('td').text().split(' ')
for i in range(0,len(pdate)): for i in range(0,len(pdate)):
value=seriesValue[i] value=seriesValue[i]
if '亿' in value: try:
value = value.replace("亿", "*100000000") if '亿' in value:
value = eval(value) value = value.replace("亿", "").replace(",", "")
elif '万' in value: value = Decimal(value) * Decimal('100000000')
value = value.replace("万", "*10000") # value = eval(value)
value = eval(value) elif '万' in value:
vvla=str(value) value = value.replace("万", "").replace(",", "")
value = Decimal(value) * Decimal('10000')
# value = eval(value)
except Exception as e:
print(e)
print(value)
vvla=str(value).replace(",", "")
serisemsg={ serisemsg={
'name':seriesName, 'name':seriesName,
'value':vvla, 'value':vvla,
...@@ -71,6 +78,31 @@ class SinaUsstock(object): ...@@ -71,6 +78,31 @@ class SinaUsstock(object):
return seriesList return seriesList
# 判断股票代码是否存在
def check_code(self,com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
res = r.exists('com_sinacaiwushuju_code::'+com_code)
#如果key存在 则不是第一次采集该企业, res = 1
if res:
return False #表示不是第一次采集
else:
return True #表示是第一次采集
def check_date(self,com_code,info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('com_sinacaiwushuju_code::'+com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
def add_date(self,com_code,date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
#遍历date_list 放入redis
for date in date_list:
res = r.sadd('com_sinacaiwushuju_code::'+com_code,date)
def getCodeFromRedis(self): def getCodeFromRedis(self):
securitiescode=self.r.lpop('sina_usstock:securities_code') securitiescode=self.r.lpop('sina_usstock:securities_code')
securitiescode = securitiescode.decode('utf-8') securitiescode = securitiescode.decode('utf-8')
...@@ -209,7 +241,7 @@ class SinaUsstock(object): ...@@ -209,7 +241,7 @@ class SinaUsstock(object):
#转换数据格式发送接口 #转换数据格式发送接口
annualzb=zbl1+zbl3+zbl5 annualzb=zbl1+zbl3+zbl5
annualzb=self.groupZbData(annualzb,stock,social_credit_code,'annual') annualzb=self.groupZbData(annualzb,stock,social_credit_code,'year')
self.sendToFinance(annualzb) self.sendToFinance(annualzb)
quarterzb=zbl2+zbl4+zbl6 quarterzb=zbl2+zbl4+zbl6
quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter') quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter')
...@@ -228,15 +260,26 @@ class SinaUsstock(object): ...@@ -228,15 +260,26 @@ class SinaUsstock(object):
def sendToFinance(self,zbmsg): def sendToFinance(self,zbmsg):
for zbb in zbmsg: for zbb in zbmsg:
com_code=zbb['securitiesCode']
com_date=zbb['date']
#判断股票代码是否采集过
if self.check_code(com_code):
zbb['ynFirst']=True
if len(zbb) != 0: if len(zbb) != 0:
# 调凯歌接口存储数据 # 调凯歌接口存储数据
data = json.dumps(zbb) data = json.dumps(zbb)
#暂无接口 #暂无接口
url_baocun = '' url_baocun = 'http://114.115.236.206:8088/sync/finance/sina'
# url_baocun = 'http://114.115.236.206:8088/sync/finance/df' # url_baocun = 'http://114.115.236.206:8088/sync/finance/df'
for nnn in range(0, 3): for nnn in range(0, 3):
try: try:
res_baocun = requests.post(url_baocun, data=data) res_baocun = requests.post(url_baocun, data=data)
#将采集到的股票代码和日期进行记录用来标记是否采集过
com_date_list=[]
com_date_list.append(com_date)
self.add_date(com_code,com_date)
self.logger.info(res_baocun.text) self.logger.info(res_baocun.text)
break break
except: except:
...@@ -309,7 +352,7 @@ class SinaUsstock(object): ...@@ -309,7 +352,7 @@ class SinaUsstock(object):
if __name__ == '__main__': if __name__ == '__main__':
sinaUsstock=SinaUsstock() sinaUsstock=SinaUsstock()
# securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code') # securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code')
securitiescode= sinaUsstock.getCodeFromRedis() # securitiescode= sinaUsstock.getCodeFromRedis()
securitiescode='AAPL' securitiescode='AAPL'
try: try:
sinaUsstock.get_content2(securitiescode) sinaUsstock.get_content2(securitiescode)
......
...@@ -541,7 +541,10 @@ class BaseCore: ...@@ -541,7 +541,10 @@ class BaseCore:
self.cursor.execute(query) self.cursor.execute(query)
token_list = self.cursor.fetchall() token_list = self.cursor.fetchall()
self.cnx.commit() self.cnx.commit()
token = token_list[random.randint(0, len(token_list)-1)][0] try:
token = token_list[random.randint(0, len(token_list)-1)][0]
except:
token = ''
return token return token
# 删除失效的token # 删除失效的token
......
# -*- coding: utf-8 -*-
import time
from urllib.parse import quote
import requests
import urllib3
from BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
# headers = {
# 'Host': 'xcx.qcc.com',
# 'Connection': 'keep-alive',
# 'Qcc-Platform': 'mp-weixin',
# 'Qcc-Timestamp': '',
# 'Qcc-Version': '1.0.0',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
# 'content-type': 'application/json',
# 'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
# 'Accept-Encoding': 'gzip, deflate, br,'
# }
headers = {
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive',
'x-request-device-type': 'Android',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF XWEB/8391',
'Content-Type': 'application/json',
'Qcc-Version': '1.0.0',
'authMini': 'Bearer f51dae1a2fcb109fa9ec58bd4a85e5c5',
'xweb_xhr': '1',
'xcx-version': '2023.09.27',
'Qcc-Platform': 'mp-weixin',
'Qcc-CurrentPage': '/company-subpackages/business/index',
'Qcc-Timestamp': '1696661787803',
'Qcc-RefPage': '/company-subpackages/detail/index',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/307/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh'
}
# 通过企业名称或信用代码获取企查查id
def find_id_by_name(start,token,name):
urllib3.disable_warnings()
qcc_key = name
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = f"https://xcx.qcc.com/mp-weixin/forwardApp/v3/base/advancedSearch?token={token}&t={t}&pageIndex=1&needGroup=yes&insuredCntStart=&insuredCntEnd=&startDateBegin=&startDateEnd=&registCapiBegin=&registCapiEnd=&countyCode=&province=&sortField=&isSortAsc=&searchKey={quote(qcc_key)}&searchIndex=default&industryV3="
for lll in range(1, 6):
try:
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
break
except Exception as e:
print(f'{e}-------------重试')
time.sleep(5)
continue
time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'} {'status': 401, 'message': '您的账号访问超频,请升级小程序版本'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
if resp_dict['status']==401:
KeyNo = False
log.info(f'=======您的账号访问超频,请升级小程序版本=====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try:
if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0]
KeyNo = result_dict['KeyNo']
Name = result_dict['Name'].replace('<em>', '').replace('</em>', '').strip()
if Name == '':
KeyNo = 'null'
else:
KeyNo = 'null'
except:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start,time.time())}')
return KeyNo
log.info("{},企业代码为:{}".format(qcc_key, KeyNo))
return KeyNo
\ No newline at end of file
...@@ -541,7 +541,10 @@ class BaseCore: ...@@ -541,7 +541,10 @@ class BaseCore:
self.cursor.execute(query) self.cursor.execute(query)
token_list = self.cursor.fetchall() token_list = self.cursor.fetchall()
self.cnx.commit() self.cnx.commit()
token = token_list[random.randint(0, len(token_list)-1)][0] try:
token = token_list[random.randint(0, len(token_list)-1)][0]
except:
token = ''
return token return token
# 删除失效的token # 删除失效的token
......
...@@ -11,24 +11,28 @@ import logbook.more ...@@ -11,24 +11,28 @@ import logbook.more
import pandas as pd import pandas as pd
import requests import requests
import zhconv import zhconv
import pymysql
import redis import redis
from docx import Document
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from openpyxl import Workbook
import langid import langid
#创建连接池 #创建连接池
import pymysql import pymysql
from pymysql import connections
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
# import sys # import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client') # sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('E:\\kkwork\\zzsn_spider\\comData\\policylaw\\client.conf') tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\comData\\policylaw\\client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
from obs import ObsClient
import fitz
from urllib.parse import unquote
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
...@@ -437,9 +441,9 @@ class BaseCore: ...@@ -437,9 +441,9 @@ class BaseCore:
#解析word文件页数 #解析word文件页数
def doc_page(self,file_path): # def doc_page(self,file_path):
doc = Document(file_path) # doc = Document(file_path)
return len(doc.sections) # return len(doc.sections)
def pdf_content(self,resp_content): def pdf_content(self,resp_content):
# 解析pdf文件内容 # 解析pdf文件内容
content = '' content = ''
...@@ -507,9 +511,9 @@ class BaseCore: ...@@ -507,9 +511,9 @@ class BaseCore:
# retData['page_size'] = page_size # retData['page_size'] = page_size
return retData return retData
def secrchATT(self,item_id,file_name,type_id): def secrchATT(self,item_id,file_name,type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s ''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s '''
self.cursor_.execute(sel_sql, (item_id, file_name, type_id)) self.cursor_.execute(sel_sql, (item_id, file_name, type_id,order_by))
selects = self.cursor_.fetchone() selects = self.cursor_.fetchone()
return selects return selects
...@@ -527,26 +531,81 @@ class BaseCore: ...@@ -527,26 +531,81 @@ class BaseCore:
page_size = retData['page_size'] page_size = retData['page_size']
create_time = retData['create_time'] create_time = retData['create_time']
order_by = num order_by = num
selects = self.secrchATT(item_id,file_name,type_id)
if selects:
self.getLogger().info(f'com_name:{com_name}已存在') Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
id = selects[0]
return id,full_path values = (
else: file_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' status, create_by,
create_time)
values = (
file_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, self.cursor_.execute(Upsql, values) # 插入
status, create_by, self.cnx_.commit() # 提交
create_time) self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,file_name,type_id,order_by)
self.cursor_.execute(Upsql, values) # 插入 id = selects[0]
self.cnx_.commit() # 提交 return id,full_path
self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,file_name,type_id)
id = selects[0] # 获取文件大小
return id,full_path def convert_size(self,size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(self,file_href,item_id,pathType,file_name):
headers = {}
category = os.path.splitext(file_href)[1]
retData = {'state': False, 'type_id': 7, 'item_id': item_id, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(file_href, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
# name = file_name
if category in file_name:
pass
else:
file_name = file_name + '.' + category
result = obsClient.putContent('zzsn', f'{pathType}' + file_name, content=response.content)
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now
except Exception as e:
print(f'error:{e}')
return retData
return retData
......
function r(size){
function r(size){
var str = "",
arr = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'];
for(var i=0; i<size; i++){
str += arr[Math.round(Math.random() * (arr.length-1))];
}
return str;
}
function strTobinary(str) {
var result = [];
var list = str.split("");
for (var i = 0; i < list.length; i++) {
if (i != 0) {
result.push(" ");
}
var item = list[i];
var binaryStr = item.charCodeAt().toString(2);
result.push(binaryStr);
};
return result.join("");
}
function cipher() {
var date = new Date();
var timestamp = date.getTime().toString();
var salt = r(24);
var year = date.getFullYear().toString();
var month = (date.getMonth() + 1 < 10 ? "0" + (date.getMonth() + 1) : date
.getMonth()).toString();
var day = (date.getDate() < 10 ? "0" + date.getDate() : date.getDate())
.toString();
var iv = year + month + day;
return salt
}
function des(salt,iv,enc) {
// var enc = des3(timestamp, salt, iv).toString();
var str = salt + iv + enc;
var ciphertext = strTobinary(str);
return ciphertext;
}
function token(){
var size = 24
var str = "",
arr = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'];
for(var i=0; i<size; i++){
str += arr[Math.round(Math.random() * (arr.length-1))];
}
return str;
}
function pageid() {
var n = 32
var text = "";
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
for (var i = 0; i < n; i++)
text += possible.charAt(Math.floor(Math.random() * possible.length));
return text;
}
// console.log(cipher());
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论