提交 a98a7ff0 作者: LiuLiYuan

lly 01/11

上级 08e4725c
# -*- coding: UTF-8 -*-
import os
import pandas as pd
from obs import ObsClient
from base import BaseCore
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
from flask import Flask, request, jsonify
from gevent import pywsgi
app = Flask(__name__)
# 获取excel表格,解析内容送入redies
@app.route('/enterprise/readExcel', methods=['GET'])
def getExcel():
baseCore = BaseCore.BaseCore()
r = baseCore.r
log = baseCore.getLogger()
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
filePath = request.args.get('filePath')
data = getDF(filePath, obsClient)
for i in range(data.shape[0]):
data_ = ''
for j in range(len(data.iloc[i])):
if pd.isna(data.iloc[i][j]):
data_ += f'|'
else:
data_ += f'{data.iloc[i][j]}|'
if data.iloc[i]['国内外(1-国内;0-国外)'] == 1 or data.iloc[i]['国内外(1-国内;0-国外)'] == '1':
r.rpush('BaseInfoEnterprise:gnqy_socialCode', data_)
elif data.iloc[i]['国内外(1-国内;0-国外)'] == 0 or data.iloc[i]['国内外(1-国内;0-国外)'] == '0':
r.rpush('BaseInfoEnterprise:gwqy_socialCode', data_)
r.rpush('BaseInfoEnterprise:gnqy_socialCode', 'end')
r.rpush('BaseInfoEnterprise:gwqy_socialCode', 'end')
log.info('文件接收成功')
obsClient.close()
baseCore.close()
return jsonify({'success': 'success'})
@app.route('/enterprise/getInfo', methods=['GET'])
def getInfo():
baseCore = BaseCore.BaseCore()
cursor = baseCore.cursor
cnx = baseCore.cnx
log = baseCore.getLogger()
gpdm = request.args.get('securitiesCode')
xydm = request.args.get('socialCode')
name = request.args.get('name')
ename = request.args.get('englishName')
place = request.args.get('place')
if not xydm and not place :
baseCore.close()
return jsonify({'error':'缺少参数'})
if not name or not ename:
baseCore.close()
return jsonify({'error':'缺少参数'})
sql = f"select * from EnterpriseInfo where SocialCode='{xydm}'"
cursor.execute(sql)
data = cursor.fetchone()
if data:
if str(place) == '1':
if gpdm:
sql_up = f"UPDATE EnterpriseInfo set SecuritiesCode='{gpdm}',CompanyName='{name}',Place={place} WHERE SocialCode='{xydm}'"
cursor.execute(sql_up)
cnx.commit()
log.info(f'更新企业信息==={xydm}')
else:
sql_up = f"UPDATE EnterpriseInfo set CompanyName='{name}',Place={place} WHERE SocialCode='{xydm}'"
cursor.execute(sql_up)
cnx.commit()
log.info(f'更新企业信息==={xydm}')
elif str(place) == '0':
if gpdm:
sql_up = f"UPDATE EnterpriseInfo set SecuritiesCode='{gpdm}',CompanyName='{ename}',Place={place} WHERE SocialCode='{xydm}'"
cursor.execute(sql_up)
cnx.commit()
log.info(f'更新企业信息==={xydm}')
else:
log.error(f'{xydm}===国内外标志错误')
baseCore.close()
return jsonify({'error':f'{xydm}===国内外标志错误'})
else:
if str(place) == '1':
if gpdm:
sql_up = f"INSERT INTO EnterpriseInfo (SocialCode,SecuritiesCode,CompanyName,Place) VALUE ('{xydm}','{gpdm}','{name}','{place}')"
cursor.execute(sql_up)
cnx.commit()
log.info(f'新增企业信息==={xydm}')
else:
sql_up = f"INSERT INTO EnterpriseInfo (SocialCode,CompanyName,Place) VALUE ('{xydm}','{name}','{place}')"
cursor.execute(sql_up)
cnx.commit()
log.info(f'新增企业信息==={xydm}')
elif str(place) == '0':
if gpdm:
sql_up = f"INSERT INTO EnterpriseInfo (SocialCode,SecuritiesCode,CompanyName,Place) VALUE ('{xydm}','{gpdm}','{ename}','{place}')"
cursor.execute(sql_up)
cnx.commit()
log.info(f'新增企业信息==={xydm}')
else:
log.error(f'{xydm}===国内外标志错误')
baseCore.close()
return jsonify({'error':f'{xydm}===国内外标志错误'})
baseCore.close()
return jsonify({'success': 'success'})
def getDF(filePath, obsClient):
response = obsClient.getObject('zzsn', filePath, loadStreamInMemory=True)
df = pd.read_excel(response['body']['buffer'])
return df
if __name__ == '__main__':
server = pywsgi.WSGIServer(('0.0.0.0',5000),app)
server.serve_forever()
......@@ -124,9 +124,6 @@ def doJob():
except Exception as e:
log.error(f'第{page}页==={title}===失败')
time.sleep(2)
df = pd.DataFrame(np.array(info_list))
df.columns = ['证券代码', '证券简称', '公告标题', '发布时间', '公告网址', '来源', '来源网址']
df.to_excel('./市场板块/深圳交易所基金公告_2.xlsx', index=False)
if __name__ == '__main__':
......
import datetime
import datetime
import time
import pandas as pd
import pymongo
import requests
from apscheduler.schedulers.blocking import BlockingScheduler
from retry import retry
from requests.packages.urllib3 import disable_warnings
import BaseCore
disable_warnings()
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').RESCenter[
'RETIsProdQuot']
db_storage_PO = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').RESCenter[
'REITsProdOverview']
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0'
}
@retry(tries=5, delay=20)
def getCodeList():
code_list = []
url = 'https://api.sgx.com/securities/v1.1?excludetypes=bonds&params=nc%2Cadjusted-vwap%2Cbond_accrued_interest%2Cbond_clean_price%2Cbond_dirty_price%2Cbond_date%2Cb%2Cbv%2Cp%2Cc%2Cchange_vs_pc%2Cchange_vs_pc_percentage%2Ccx%2Ccn%2Cdp%2Cdpc%2Cdu%2Ced%2Cfn%2Ch%2Ciiv%2Ciopv%2Clt%2Cl%2Co%2Cp_%2Cpv%2Cptd%2Cs%2Csv%2Ctrading_time%2Cv_%2Cv%2Cvl%2Cvwap%2Cvwap-currency'
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data_json = req.json()['data']['prices']
for data in data_json:
if data['type'] == 'reits':
code = data['nc']
code_list.append(code)
req.close()
return code_list
def getDataJson(code):
url = f'https://api.sgx.com/securities/v1.1/charts/historic/reits/code/{code}/1w'
req = requests.get(url,headers=headers,verify=False)
req.encoding = req.apparent_encoding
data_json = req.json()['data']['historic']
req.close()
return data_json
def doJob():
code_list = getCodeList()
for code in code_list:
data_json = getDataJson(code)
for data_ in data_json:
date = datetime.datetime.strptime(data_['trading_time'], '%Y%m%d_%H%M%S')
country = db_storage_PO.find_one({'code':code,'exchange':'新加坡交易所'})['country']
is_insert = db_storage.find_one({'code':code,'date':date})
if is_insert:
log.info(f'{code}===已采集')
continue
name = data_['n']
opening = data_['o']
max = data_['h']
min = data_['l']
closed = data_['lt']
ytdClosed = data_['pv']
volume = float(data_['vl'])*1000
amount = data_['v']
dic_info = {
'code': code, # 代码
'shortName': name, # 简称
'opening': float(opening), # 开盘价
'max': float(max), # 最高价
'min': float(min), # 最低价
'closed': float(closed), # 收盘价
'ytdClosed': float(ytdClosed), # 前收价
'volume': float(volume), # 交易量
'amount': float(amount), # 交易金额
'totalValue': '', # 市价总值
'negoValue': '', # 流通总值
'toRate': '', # 换手率
'date': date, # 时间
'strDate': str(date)[:10],
'country': country, # 国家
'exchange': '新加坡交易所', # 交易所
'currency':'SGD'# 币种
}
try:
db_storage.insert_one(dic_info)
log.info(f'{code}==={name}==={date}===采集成功')
except:
log.error(f'{code}==={name}==={date}===保存失败')
time.sleep(2)
def task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每天执行一次
scheduler.add_job(doJob, 'cron', hour='18', minute=0, max_instances=2 )
try:
scheduler.start()
except Exception as e:
log.error('定时采集异常', e)
pass
if __name__ == '__main__':
doJob()
#task()
\ No newline at end of file
import datetime
import datetime
import json
import re
import time
import pymongo
import requests
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = {
'Accept':'*/*',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
}
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').RESCenter[
'RETIsProdQuot']
def getList():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'Accept': '*/*',
'Referer': 'https://www.hkex.com.hk/'
}
url = f'https://www1.hkex.com.hk/hkexwidget/data/getreitfilter?lang=chi&token=evLtsLsBNAUVTPxtGqVeGyhVsyUra3j5V4IkrZKhkiaD97FD%2bFaZmy%2f2JkHt6s0m&sort=5&order=0&qid={int(time.time())*1000}&callback=jQuery3510399252964830082_1702532830368&'
req = requests.get(url,headers=headers)
req.encoding = req.apparent_encoding
data_json = re.findall('\((.*?)\)', req.text)[0]
data_json = json.loads(data_json)['data']['stocklist']
req.close()
return data_json
def getDataJson(ric):
url = f'https://www1.hkex.com.hk/hkexwidget/data/getchartdata2?hchart=1&span=6&int=6&ric={ric}&token=evLtsLsBNAUVTPxtGqVeG1W4%2fDlQ5x7gUilJ3XOCHQYqDv29Qh%2f5bHDJA2BUz3YL&qid={int(time.time())*1000}&callback=jQuery35105943072391434097_1702533112873'
req = requests.get(url,headers=headers)
req.encoding = req.apparent_encoding
data_list = json.loads(re.findall('\((.*?)\)',req.text)[0])['data']['datalist']
return data_list
# https://www.hkex.com.hk/Market-Data/Securities-Prices/Real-Estate-Investment-Trusts?sc_lang=zh-HK
def doJob():
try:
info_json = getList()
except:
log.error(f'列表信息获取失败')
return
for info in info_json:
code = info['sym']
name = baseCore.hant_2_hans(info['nm'])
ric = info['ric']
currency = info['ccy']
try:
data_list = getDataJson(ric)
except:
log.error(f'{code}==={name}===数据获取失败')
continue
data_list = data_list[-11:]
# for i in range(len(data_list)):
for data in data_list:
date = datetime.datetime.fromtimestamp(int(data[0])/1000)
opening = data[1]
if not opening:
continue
is_insert = db_storage.find_one({'code':code,'strDate':str(date)[:10],'exchange':'香港交易所'})
if is_insert:
log.info(f'{code}==={name}===已采集')
continue
# if i == 1:
# ytdClosed = 0
# else:
if date.isoweekday() == 1:
yesterday = date - datetime.timedelta(days=3)
else:
yesterday = date - datetime.timedelta(days=1)
while True:
try:
ytdClosed = db_storage.find_one({'code':code,'strDate':f'{str(yesterday)[:10]}','exchange':'香港交易所'})['closed']
break
except:
yesterday = yesterday - datetime.timedelta(days=1)
max = data[2]
min = data[3]
closed = data[4]
volume = data[5]
amount = data[6]
dic_info = {
'code': code, # 代码
'shortName': name, # 简称
'opening': float(opening), # 开盘价
'max': float(max), # 最高价
'min': float(min), # 最低价
'closed': float(closed), # 收盘价
'ytdClosed': float(ytdClosed), # 前收价
'volume': float(volume), # 交易量
'amount': float(amount), # 交易金额
'totalValue': '', # 市价总值
'negoValue': '', # 流通总值
'toRate': '', # 换手率
'date': date, # 时间
'strDate': str(date)[:10],
'country': '中国', # 国家
'exchange': '香港交易所', # 交易所
'currency':currency# 币种
}
try:
db_storage.insert_one(dic_info)
log.info(f'{code}==={name}==={date}===采集成功')
except:
log.error(f'{code}==={name}==={date}===入库失败')
time.sleep(1)
if __name__ == '__main__':
doJob()
baseCore.close()
\ No newline at end of file
import re
import re
import time
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import pymongo
import requests
from apscheduler.schedulers.blocking import BlockingScheduler
from retry import retry
from base import BaseCore
baseCore = BaseCore.BaseCore()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').RESCenter[
'RETIsProdQuot']
log = baseCore.getLogger()
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Host': 'www.szse.cn',
'Pragma': 'no-cache',
'Referer': 'http://www.szse.cn/market/product/list/all/index.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'X-Request-Type': 'ajax',
'X-Requested-With': 'XMLHttpRequest',
}
# 获取基金代码与上市时间
@retry(tries=3, delay=3)
def getData():
data_list = []
# ip = baseCore.get_proxy()
url = 'https://reits.szse.cn/api/report/ShowReport/data?SHOWTYPE=JSON&CATALOGID=reits_fund_list&PAGENO=1&PAGESIZE=10'
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
data_json = req.json()[0]['data']
for data_ in data_json:
jjjcurl = re.findall('<u>(.*?)</u>', data_['jjjcurl'])[0].lstrip().strip()
sys_key = data_['sys_key'].lstrip().strip()
ssrq = data_['ssrq'].lstrip().strip()
# 基金简称 基金代码 上市时间
data = [jjjcurl, sys_key, ssrq]
data_list.append(data)
req.close()
return data_list
# 获取基金交易信息
@retry(tries=3, delay=20)
def getDataList(code, start_date, end_date):
ip = baseCore.get_proxy()
archiveDate = str(datetime.today().year) + '-'+ str(datetime.today().month) + '-' + '01'
url = f'http://www.szse.cn/api/report/ShowReport/data?SHOWTYPE=JSON&CATALOGID=1815_stock_snapshot&TABKEY=tab2&txtDMorJC={code}&txtBeginDate={str(start_date)[:10]}&txtEndDate={str(end_date)[:10]}&archiveDate={archiveDate}'
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
data_json = req.json()[0]['data'][::-1]
req.close()
for data_ in data_json:
jyrq = data_['jyrq']
zqdm = data_['zqdm']
zqjc = data_['zqjc']
qss = data_['qss'].replace(',', '')
ks = data_['ks'].replace(',', '')
zg = data_['zg'].replace(',', '')
zd = data_['zd'].replace(',', '')
ss = data_['ss'].replace(',', '')
cjgs = data_['cjgs'].replace(',', '')
cjje = data_['cjje'].replace(',', '')
jyrq = datetime.strptime(jyrq, '%Y-%m-%d')
is_insert = db_storage.find_one({'code': zqdm, 'date': jyrq, 'exchange': '深圳证券交易所'})
if is_insert:
log.info(f'{code}==={jyrq}===已采集')
continue
dic_info = {
'code': zqdm, # 代码
'shortName': zqjc, # 简称
'opening': float(ks), # 开盘价
'max': float(zg), # 最高价
'min': float(zd), # 最低价
'closed': float(ss), # 收盘价
'ytdClosed': float(qss), # 前收价
'volume': float(cjgs), # 交易量
'amount': float(cjje), # 交易金额
'totalValue': '', # 市价总值
'negoValue': '', # 流通总值
'toRate': '', # 换手率
'date': jyrq, # 时间
'strDate' : str(jyrq)[:10], # 字符串 时间
'country': '中国', # 国家
'exchange': '深圳证券交易所', # 交易所
"currency": "CNY" # 币种
}
db_storage.insert_one(dic_info)
log.info(f'{code}==={jyrq}===采集成功')
time.sleep(3)
def doJob():
try:
data_list = getData()
except Exception as e:
log.error(f'基金列表获取失败==={e}')
return
log.info('开始采集')
for data in data_list:
name = data[0]
code = data[1]
log.info(f'{code}==={name}===开始采集')
# start_date = data[2]
# start_date = datetime.strptime(start_date, "%Y-%m-%d")
current_date = datetime.now()
start_date = current_date + timedelta(days=-5)
# end_date = start_date + timedelta(days=5)
# while end_date != current_date:
# time.sleep(1)
try:
# getDataList(code, start_date, end_date)
getDataList(code,start_date,current_date)
except Exception as e:
log.error(f'{code}==={start_date}-{current_date}===采集失败==={e}')
# start_date = end_date + timedelta(days=1)
# end_date = start_date + timedelta(days=5)
# if end_date > current_date:
# end_date = current_date
def task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每天执行一次
scheduler.add_job(doJob, 'cron', hour='18', minute=0, max_instances=2 )
try:
scheduler.start()
except Exception as e:
log.error('定时采集异常', e)
pass
if __name__ == '__main__':
# doJob()
task()
import json
import json
import re
import time
import calendar
import pymongo
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
from retry import retry
import BaseCore
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').RESCenter[
'REITsTxnStat']
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Host': 'query.sse.com.cn',
'Pragma': 'no-cache',
'Referer': 'http://www.sse.com.cn/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
}
@retry(tries=5, delay=20)
def getJson(url):
# ip = baseCore.get_proxy()
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
data_json = re.findall('\((.*)\)', req.text)[0]
data_json = json.loads(data_json)
req.close()
return data_json
# 2021-06-26
# 每日概况
def getDayData():
# start_date = datetime(2021, 6, 21)
start_date = datetime.today() - timedelta(days=5)
end_date = datetime.today() - timedelta(days=1)
date_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
for date in date_range:
date_ = date.strftime('%Y-%m-%d')
url = f'http://query.sse.com.cn/commonQuery.do?jsonCallBack=jsonpCallback89728&sqlId=COMMON_SSE_REITS_HQXX_CJTJ_DAY_L&TRADE_DATE={date_}&FUND_TYPE=01&_={int(time.time())}'
try:
data_json = getJson(url)['result']
if len(data_json) == 0:
continue
data_json = data_json[0]
except Exception as e:
log.error(f'{date}===连接失败==={e}')
time.sleep(3)
continue
is_insert = db_storage.find_one({'strDate': str(date)[:10], 'exchange': '上海证券交易所'})
if is_insert:
log.info(f'{date}===已采集')
time.sleep(3)
continue
dic_info = {
'number': int(data_json['LIST_NUM']), # 挂牌数
'volume': float(data_json['TRADE_VOL']) * 10000, # 成交量
'amount': float(data_json['TRADE_AMT']) * 10000, # 成交金额
'totalValue': float(data_json['TOTAL_VALUE']) * 10000, # 市价总额
'negoValue': float(data_json['NEGO_VALUE']) * 10000, # 流通市值
'toRate': float(data_json['TO_RATE']), # 换手率
'date': date,
'strDate':str(date)[:10],
'country': '中国',
'exchange': '上海证券交易所',
'currency': 'CNY', # 币种
}
try:
db_storage.insert_one(dic_info)
log.info(f'{date}===采集成功')
except Exception as e:
log.error(f'{date}===数据存储失败==={e}')
time.sleep(3)
# 每周概况
def getWeekData(writer):
data_list = []
start_date = datetime(2021, 6, 21)
end_date = datetime.today()
date_range = [start_date + timedelta(days=x) for x in range(0, (end_date - start_date).days + 1, 7)]
for date_1 in date_range:
date_2 = (date_1 + timedelta(days=6)).strftime('%Y-%m-%d')
date_1 = date_1.strftime('%Y-%m-%d')
url = f'http://query.sse.com.cn/commonQuery.do?jsonCallBack=jsonpCallback65413&sqlId=COMMON_SSE_REITS_HQXX_CJTJ_WEEK_L&START_DATE={date_1}&END_DATE={date_2}&FUND_TYPE=01&_={int(time.time())}'
data_json = getJson(url)['result']
for data_ in data_json:
data = [data_['LIST_NUM'], data_['TRADE_VOL'], data_['TRADE_AMT'], data_['TOTAL_VALUE'],
data_['NEGO_VALUE'], data_['TO_RATE'], f'{date_1}至{date_2}']
dic_info = {
'挂牌数': data_['LIST_NUM'],
'成交量(亿份)': data_['TRADE_VOL'],
'成交金额(亿元)': data_['TRADE_AMT'],
'市价总额(亿元)': data_['TOTAL_VALUE'],
'流通市值(亿元)': data_['NEGO_VALUE'],
'换手率(%)': data_['TO_RATE'],
'日期': f'{date_1}至{date_2}',
'类别': '每周概况'
}
db_storage.insert_one(dic_info)
log.info(f'{date_1}至{date_2}===采集完成')
data_list.append(data)
time.sleep(1)
df = pd.DataFrame(np.array(data_list))
df.columns = ['挂牌数', '成交量(亿份)', '成交金额(亿元)', '市价总额(亿元)', '流通市值(亿元)', '换手率(%)', '日期']
df.to_excel(writer, sheet_name='每周概况', index=False)
# 月度概况
def getMonthData(writer):
data_list = []
start_date = datetime.strptime('2021-06-01', '%Y-%m-%d')
current_date = datetime.now()
while start_date <= current_date:
year = start_date.year
month = start_date.month
date = start_date.strftime('%Y-%m')
url = f'http://query.sse.com.cn/commonQuery.do?jsonCallBack=jsonpCallback76435&sqlId=COMMON_SSE_REITS_HQXX_CJTJ_MONTH_L&TRADE_DATE={date}&FUND_TYPE=01&_={int(time.time())}'
data_json = getJson(url)['result']
for data_ in data_json:
data = [data_['LIST_NUM'], data_['TRADE_VOL'], data_['TRADE_AMT'], data_['TOTAL_VALUE'],
data_['NEGO_VALUE'], data_['TO_RATE'], date]
dic_info = {
'挂牌数': data_['LIST_NUM'],
'成交量(亿份)': data_['TRADE_VOL'],
'成交金额(亿元)': data_['TRADE_AMT'],
'市价总额(亿元)': data_['TOTAL_VALUE'],
'流通市值(亿元)': data_['NEGO_VALUE'],
'换手率(%)': data_['TO_RATE'],
'日期': date,
'类别': '月度概况'
}
db_storage.insert_one(dic_info)
log.info(f'{date}===采集完成')
data_list.append(data)
if month == 12:
start_date = start_date.replace(year=year + 1, month=1)
else:
start_date = start_date.replace(month=month + 1)
time.sleep(1)
df = pd.DataFrame(np.array(data_list))
df.columns = ['挂牌数', '成交量(亿份)', '成交金额(亿元)', '市价总额(亿元)', '流通市值(亿元)', '换手率(%)', '日期']
df.to_excel(writer, sheet_name='每月概况', index=False)
def task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每天执行一次
scheduler.add_job(getDayData, 'cron', hour='8', minute=0, max_instances=2 )
try:
scheduler.start()
except Exception as e:
log.error('定时采集异常', e)
pass
if __name__ == '__main__':
task()
baseCore.close()
\ No newline at end of file
import time
import time
from datetime import datetime
from datetime import timedelta
from decimal import Decimal
import pymongo
import requests
from retry import retry
from apscheduler.schedulers.blocking import BlockingScheduler
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').RESCenter[
'REITsTxnStat']
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Host': 'www.szse.cn',
'Pragma': 'no-cache',
'Referer': 'http://www.szse.cn/market/fund/dealSurvey/daily/index.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'X-Request-Type': 'ajax',
'X-Requested-With': 'XMLHttpRequest'
}
@retry(tries=5, delay=10)
def getJson(url):
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
return req.json()[0]['data']
@retry(tries=5, delay=10)
def getJsonB(date):
# headers = {
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
# }
url = f'http://www.szse.cn/api/report/ShowReport/data?SHOWTYPE=JSON&CATALOGID=1803_sczm&TABKEY=tab1&txtQueryDate={date}'
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
datas = req.json()[0]['data']
for data in datas:
if '基础设施基金' in data['lbmc']:
try:
amount = db_storage.find_one({'exchange': '深圳证券交易所', 'strDate': date})['amount']
except:
log.error(f'{date}===无成交金额')
break
number = int(data['zqsl'])
totalValue = float(Decimal(data['sjzz']) * Decimal('10000'))
negoValue = float(Decimal(data['ltsz']) * Decimal('10000'))
return number,totalValue,negoValue
def doJob():
# start_date = datetime(2021, 6, 21)
start_date = datetime.today() - timedelta(days=5)
end_date = datetime.today() - timedelta(days=1)
date_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
for date in date_range:
date_ = date.strftime('%Y-%m-%d')
is_insert = db_storage.find_one({'strDate': str(date)[:10], 'exchange': '深圳证券交易所'})
if is_insert:
log.info(f'{date_}===已采集')
time.sleep(3)
continue
url = f'http://www.szse.cn/api/report/ShowReport/data?SHOWTYPE=JSON&CATALOGID=scsj_jjrdgk&TABKEY=tab1&txtQueryDate={date_}&tjzqlb=D'
data_json = getJson(url)
for data in data_json:
if '基础设施基金' in data['lbmc']:
volume = data['cjl'].replace(',', '')
amount = data['cjje'].replace(',', '')
number,totalValue,negoValue = getJsonB(date_)
toRate = float(Decimal(f'{amount}') / Decimal(f'{totalValue}') * Decimal('100'))
dic_info = {
'number': number, # 挂牌数
'volume': float(volume), # 成交量
'amount': float(amount), # 成交金额
'totalValue': totalValue, # 市价总额
'negoValue': negoValue, # 流通市值
'toRate': toRate, # 换手率
'date': date,
'strDate': str(date)[:10],
'country': '中国',
'exchange': '深圳证券交易所',
'currency': 'CNY', # 币种
}
try:
db_storage.insert_one(dic_info)
log.info(f'{date_}===数据采集成功')
except:
log.error(f'{date_}===数据入库失败')
time.sleep(3)
def task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每天执行一次
scheduler.add_job(doJob, 'cron', hour='8', minute=0, max_instances=2 )
try:
scheduler.start()
except Exception as e:
log.error('定时采集异常', e)
pass
if __name__ == '__main__':
task()
# number,totalValue,negoValue = getJsonB('2023-12-27')
# print(number,totalValue,negoValue)
baseCore.close()
import datetime
import datetime
import pymongo
import requests
from fitz import fitz
import BaseCore
baseCore = BaseCore.BaseCore()
cursor_ = baseCore.cursor_
cnx_ = baseCore.cnx_
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').RESCenter[
'REITsProjDynamics']
datas = db_storage.find({'exchange':'上海证券交易所'})
for data in datas:
mongoId = data['_id']
for a in data['file'].keys():
if data['file'][f'{a}']:
path_1 = data['file'][f'{a}']['declaration']
path_2 = data['file'][f'{a}']['fdbkResp']
path_3 = data['file'][f'{a}']['cover']
if path_1:
# sql = f"select id from clb_sys_attachment where full_path='{path_1}' and type_id=15"
# cursor_.execute(sql)
# id = cursor_.fetchone()
# print(id)
sql = f"update clb_sys_attachment set item_id='{mongoId}' where full_path='{path_1}' and type_id=15"
cursor_.execute(sql)
cnx_.commit()
if path_2:
# sql = f"select id from clb_sys_attachment where full_path='{path_2}'and type_id=15"
# cursor_.execute(sql)
# id = cursor_.fetchone()
# print(id)
sql = f"update clb_sys_attachment set item_id='{mongoId}' where full_path='{path_2}' and type_id=15"
cursor_.execute(sql)
cnx_.commit()
if path_3:
# sql = f"select id from clb_sys_attachment where full_path='{path_3}'and type_id=15"
# cursor_.execute(sql)
# id = cursor_.fetchone()
# print(id)
sql = f"update clb_sys_attachment set item_id='{mongoId}' where full_path='{path_3}' and type_id=15"
cursor_.execute(sql)
cnx_.commit()
import os
import sys
import logbook
def logFormate(record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间
level=record.level_name, # 日志等级
filename=os.path.split(record.filename)[-1], # 文件名
func_name=record.func_name, # 函数名
lineno=record.lineno, # 行号
msg=record.message # 日志内容
)
return formate
# 获取logger
def getLogger(fileLogFlag=True, stdOutFlag=True):
pid = os.getpid()
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + f"{pid}.log"
if not os.path.exists(dirname):
os.mkdir(dirname)
logbook.set_datetime_format('local')
logger = logbook.Logger(filename)
logger.handlers = []
if fileLogFlag: # 日志输出到文件
logFile = logbook.TimedRotatingFileHandler(os.path.join(dirname, filename), date_format='%Y-%m-%d',
bubble=True, encoding='utf-8')
logFile.formatter = logFormate
logger.handlers.append(logFile)
if stdOutFlag: # 日志打印到屏幕
logStd = logbook.more.ColorizedStderrHandler(bubble=True)
logStd.formatter = logFormate
logger.handlers.append(logStd)
return logger
\ No newline at end of file
......@@ -5,7 +5,8 @@ import pandas as pd
import requests
from goose3 import Goose
from goose3.text import StopWordsChinese, StopWordsKorean, StopWordsArabic
sys.path.append('D:\\kkwork\\zzsn_spider\\base\\smart')
# sys.path.append('D:\\kkwork\\zzsn_spider\\base\\smart')
sys.path.append(r'F:\zzsn\zzsn_spider\base\smart')
from entity import *
from smart_extractor_utility import SmartExtractorUtility
# goose3自带的lxml,提示找不到etree,但仍可使用
......
import json
import time
import numpy as np
import pandas as pd
import requests
import urllib3
from bs4 import BeautifulSoup
from kafka import KafkaProducer
import sys
sys.path.append(r'D:\zzsn_spider\base')
import BaseCore
from retry import retry
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
taskType = '企业基本信息/雅虎财经'
baseCore = BaseCore.BaseCore()
cursor = baseCore.cursor
cnx = baseCore.cnx
r = baseCore.r
log = baseCore.getLogger()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
# 保存基本信息
def saveBaseInfo(info, xydm, gpdm,id_):
url_ = info['base_info']['公司网站']
add_ = info['base_info']['地址']
company_dict = [id_,xydm, info['base_info']['英文名'], info['base_info']['电话'], url_, info['base_info']['公司简介'],
info['base_info']['行业'], add_, gpdm]
return company_dict
# 获取请求响应
@retry(tries=5, delay=3)
def getRes(url):
response = requests.get(url, headers=headers, verify=False)
if response.status_code != 200:
raise
return response
# 根据股票代码 获取企业基本信息
def getInfo(gpdm, start):
if 'HK' in str(gpdm):
tmp_g = str(gpdm).split('.')[0]
if len(tmp_g) == 5:
gpdm_ = str(gpdm)[1:]
else:
gpdm_ = gpdm
elif str(gpdm)[-2:] == '.N' or str(gpdm)[-2:] == '.O':
gpdm_ = gpdm[:-2]
else:
gpdm_ = gpdm
retData = {}
url = f'https://finance.yahoo.com/quote/{gpdm_}/profile?p={gpdm_}'
time.sleep(3)
try:
response = getRes(url)
except:
log.error(f"{gpdm}------访问基本信息页面失败")
return retData,'访问基本信息页面失败'
if 'lookup' in response.url:
log.error(f"{gpdm}------股票代码未查询到信息:{response.status_code}")
return retData,'股票代码未查询到信息'
if url != response.url:
log.error(f'{gpdm}------请求失败')
return retData,'请求失败'
state = 1
soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
if page.text == '' or 'Invalid Date data is not available' in page.text:
state = 0
log.error(f'{gpdm}---没有基本信息')
return retData,'没有基本信息'
try:
name = page.find('h3', {'class': 'Fz(m) Mb(10px)'}).text.lstrip().strip()
except:
log.error(f'{gpdm}------其它错误原因')
return retData,'其它错误原因'
try:
com_info = page.find('div', {'class': 'Mb(25px)'})
except:
com_info = ''
try:
com_phone = com_info.find_all('p')[0].find('a').text.lstrip().strip()
except:
com_phone = ''
try:
com_url = com_info.find_all('p')[0].find('a', {'target': '_blank'}).text.lstrip().strip()
except:
com_url = ''
try:
com_address = ''
com_addressTag = com_info.find_all('p')[0]
a_list = com_addressTag.select('a')
for a in a_list:
a.decompose()
com_addressTag = str(com_addressTag).replace('<br/>', '</p><p>')
com_addressTag = BeautifulSoup(com_addressTag, 'html.parser')
p_list = com_addressTag.select('p')
for p in p_list:
com_address += p.text.lstrip().strip() + ' '
com_address = com_address.lstrip().strip()
except:
com_address = ''
try:
com_bumen = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[0].text.lstrip().strip()
except:
com_bumen = ''
try:
com_hangye = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[1].text.lstrip().strip()
except:
com_hangye = ''
try:
com_people = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[2].text.lstrip().strip()
except:
com_people = ''
try:
com_jianjie = page.find('p', {'class': 'Mt(15px) Lh(1.6)'}).text.lstrip().strip()
except:
com_jianjie = ''
dic_com_info = {
'英文名': name,
'股票代码': gpdm,
'地址': com_address,
'电话': com_phone,
'公司网站': com_url,
'部门': com_bumen,
'行业': com_hangye,
'员工人数': com_people,
'公司简介': com_jianjie
}
retData['base_info'] = dic_com_info
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
response.close()
return retData,'成功'
# 采集工作
def beginWork():
dic_list = []
error_list1 = []
error_list2 = []
error_list3 = []
writer_ = pd.ExcelWriter(r'D:\zzsn_spider\comData\未采集到企业基本信息_50001-55000.xlsx')
writer = pd.ExcelWriter(r'D:\zzsn_spider\comData\企业基本信息_50001-55000.xlsx')
df = pd.read_excel(r'D:\zzsn_spider\comData\雅虎财经上市企业信息采集50001-55000_20231215.xlsx',sheet_name='yahoostock')
# xydm_list = df['信用代码']
gpdm_list = df['symbol']
id_list = df['id']
for i in range(len(gpdm_list)):
gpdm = gpdm_list[i]
id_ = id_list[i]
if not gpdm or gpdm == '':
continue
info,exc = getInfo(gpdm, time.time())
if info:
dic = saveBaseInfo(info, '', gpdm,id_)
dic_list.append(dic)
else:
if exc == '股票代码未查询到信息':
error_list1.append([id_,gpdm])
elif exc == '没有基本信息':
error_list2.append([id_,gpdm])
elif exc == '其它错误原因':
error_list3.append([id_,gpdm])
df_ = pd.DataFrame(np.array(dic_list))
df_.columns = ['id','信用代码','英文名','电话','官网','简介','行业','地址','股票代码']
df_.to_excel(writer, index=False)
writer.save()
df_1 = pd.DataFrame(np.array(error_list1))
df_2 = pd.DataFrame(np.array(error_list2))
df_3 = pd.DataFrame(np.array(error_list3))
df_1.columns = ['id','股票代码']
df_2.columns = ['id','股票代码']
df_3.columns = ['id','股票代码']
df_1.to_excel(writer_, index=False,sheet_name='股票代码为查询到信息')
df_2.to_excel(writer_, index=False,sheet_name='没有基本信息')
df_3.to_excel(writer_, index=False,sheet_name='其它错误原因')
writer_.save()
# 释放资源
baseCore.close()
if __name__ == '__main__':
beginWork()
import json
import time
import pandas as pd
import requests
import urllib3
from bs4 import BeautifulSoup
# sys.path.append(r'F:\zzsn\zzsn_spider\base')
# import BaseCore
from retry import retry
from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
taskType = '企业基本信息/雅虎财经'
baseCore = BaseCore.BaseCore()
r = baseCore.r
log = baseCore.getLogger()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
@retry(tries=5, delay=3)
def getRes(url):
response = requests.get(url, headers=headers, verify=False)
if response.status_code != 200:
raise
return response
# 根据股票代码 获取企业基本信息 高管信息
def getInfo(xydm, gpdm, start):
if 'HK' in str(gpdm):
tmp_g = str(gpdm).split('.')[0]
if len(tmp_g) == 5:
gpdm_ = str(gpdm)[1:]
else:
gpdm_ = gpdm
elif str(gpdm)[-2:] == '.N' or str(gpdm)[-2:] == '.O':
gpdm_ = gpdm[:-2]
else:
gpdm_ = gpdm
retData = {}
url = f'https://finance.yahoo.com/quote/{gpdm_}/profile?p={gpdm_}'
log.info(url)
time.sleep(3)
try:
response = getRes(url)
except:
log.error(f"{gpdm}------访问基本信息页面失败")
state = -1
exeception = '访问基本信息页面失败'
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog('', taskType, state, takeTime, url, exeception)
r.lpush('BaseInfoEnterprise:gwqy_socialCode', '')
return state, retData
if url != response.url:
log.error(f'{gpdm}------请求失败')
state = -1
# r.lpush('BaseInfoEnterprise:gwqy_socialCode_gg', xydm)
return state, retData
state = 1
soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
# 高管信息
retPeople = []
try:
list_people = page.find('table', {'class': 'W(100%)'}).find_all('tr')[1:]
except:
list_people = []
for one_people in list_people:
try:
p_name = one_people.find_all('td')[0].text
except:
p_name = ''
continue
try:
p_zhiwu = one_people.find_all('td')[1].text
except:
p_zhiwu = ''
try:
p_money = one_people.find_all('td')[2].text
except:
p_money = ''
try:
p_xingshi = one_people.find_all('td')[3].text
except:
p_xingshi = ''
try:
p_year = one_people.find_all('td')[4].text
except:
p_year = ''
if (p_zhiwu == "N/A"):
p_zhiwu = ""
if (p_money == "N/A"):
p_money = ""
if (p_xingshi == "N/A"):
p_xingshi = ""
if (p_year == "N/A"):
p_year = ""
dic_main_people = {
'股票代码': gpdm,
'信用代码': xydm,
'姓名': p_name,
'职务': p_zhiwu,
'薪资': p_money,
'行使': p_xingshi,
'出生年份': p_year
}
retPeople.append(dic_main_people)
retData['people_info'] = retPeople
log.info(f"获取高管信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
response.close()
return state, retData
@retry(tries=3, delay=2)
def sendPost(json_updata):
response = requests.post('http://114.115.236.206:9988/datapull/sync/executive', data=json_updata,
timeout=300, verify=False)
if (response.status_code == 200):
retJson = json.loads(response.content.decode('utf-8'))
if (retJson['success'] or retJson['success'] == 'true'):
pass
else:
raise
else:
raise
# 保存高管信息
def savePeopleInfo(info, xydm, start):
# 高管信息调用接口
list_people = info['people_info']
list_one_info = []
for i in range(0, len(list_people)):
dic_json = {
"socialCreditCode": list_people[i]['信用代码'],
"name": list_people[i]['姓名'],
"sex": '',
"education": '',
"position": list_people[i]['职务'],
"salary": list_people[i]['薪资'],
"birthYear": list_people[i]['出生年份'],
"shareNum": '',
"shareRatio": '',
"benefitShare": '',
"currentTerm": '',
"personInfo": '',
"sort": str(i + 1)
}
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
if json_updata == '[]':
log.info("没有高管")
pass
else:
try:
sendPost(json_updata)
except:
log.error(f"保存高管接口失败")
exception = '保存高管接口失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
return state
state = 1
log.info(f"保存高管信息--{xydm},耗时{baseCore.getTimeCost(start, time.time())}")
return state
# 采集工作
def beginWork():
while True:
social_code = baseCore.redicPullData('BaseInfoEnterprise:gwqy_socialCode_gg')
# social_code = 'ZZSN231114182705007'
if social_code == 'None' or not social_code:
time.sleep(20)
break
# 数据库中获取基本信息
data = baseCore.getInfomation(social_code)
gpdm = data[3]
xydm = data[2]
# 获取该企业对应项目的采集次数
start_time = time.time()
try:
state, retData = getInfo(xydm, gpdm, start_time)
if state == 1:
state = savePeopleInfo(retData, xydm, start_time)
time.sleep(1)
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '')
else:
pass
except Exception as e:
log.error(f'{xydm}===错误原因:{e}')
pass
# 释放资源
baseCore.close()
if __name__ == '__main__':
beginWork()
\ No newline at end of file
from kafka import KafkaConsumer
from kafka import KafkaConsumer
......@@ -9,13 +9,15 @@ import datetime
import time
import redis
import hashlib
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
import random
from kafka import KafkaProducer
r = redis.Redis(host="localhost", port=6379)
# r = redis.Redis(host="localhost", port=6379)
# 将数据转换成hash值,用来对文章url进行去重,实现增量爬虫
......@@ -46,12 +48,22 @@ def add_url(article_url):
# 使用模拟浏览器来获取cookie值
def get_cookie():
executable_path = r"D:\chrome\chromedriver.exe"
opt = webdriver.ChromeOptions()
opt.add_argument('--headless')
browser = webdriver.Chrome(chrome_options=opt, executable_path=executable_path)
# executable_path = r"F:\spider\117\chromedriver.exe"
# opt = webdriver.ChromeOptions()
# #opt.add_argument('--headless')
#
# browser = webdriver.Chrome(chrome_options=opt, executable_path=executable_path)
path = r"F:\spider\117\chromedriver.exe"
service = Service(path)
chrome_options = webdriver.ChromeOptions()
# chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_experimental_option(
"excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('lang=zh-CN,zh,zh-TW,en-US,en')
chrome_options.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36')
browser = webdriver.Chrome(options=chrome_options, service=service)
browser.get("https://weibo.com/")
# 等待界面出现再获取cookie
WebDriverWait(browser, 10).until(EC.element_to_be_clickable((By.XPATH, "//*[@id=\"app\"]")))
......@@ -66,8 +78,9 @@ def get_cookie():
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
r.set("cookies", str(cookies), ex=600)
# r.set("cookies", str(cookies), ex=600)
# print(cookies)
return cookies
# 代码主程序,通过给出的用户url来获取用户发布的文章
......@@ -78,13 +91,13 @@ def get_content_by_user_uid(url, sid):
}
s = requests.session()
cookies_str = r.get("cookies")
if cookies_str == None:
get_cookie()
cookies = json.loads('{' + re.findall("{(.*?)}", str(r.get("cookies")).replace("\'", "\""))[0] + '}')
else:
cookies = json.loads('{' + re.findall("{(.*?)}", str(cookies_str).replace("\'", "\""))[0] + '}')
# cookies_str = r.get("cookies")
cookies_str = get_cookie()
# if cookies_str == None:
# get_cookie()
# cookies = json.loads('{' + re.findall("{(.*?)}", str(r.get("cookies")).replace("\'", "\""))[0] + '}')
# else:
cookies = json.loads('{' + re.findall("{(.*?)}", str(cookies_str).replace("\'", "\""))[0] + '}')
s.cookies.update(cookies)
......@@ -115,10 +128,11 @@ def get_content_by_user_uid(url, sid):
# print(uid)
if add_uid(uid): # 若uid已存在于redis中,只爬取该作者第一页文章
num_page = 2
else: # 若uid不存在redis中,爬取1000页
num_page = 1000
# if add_uid(uid): # 若uid已存在于redis中,只爬取该作者第一页文章
# num_page = 2
# else: # 若uid不存在redis中,爬取1000页
# num_page = 1000
num_page = 10
# 爬取程序入口
for page in range(1, num_page): # 对后面进行无限翻页,直到无内容显示后跳出
......@@ -142,8 +156,8 @@ def get_content_by_user_uid(url, sid):
title_one_con = list_news_[0]['url_title'] # 文章标题
url_one_con = "https://weibo.com/ttarticle/p/show?id=" + list_news_[0]['page_id'] # 文章链接URL
if add_url(url_one_con): # 若url已存在,则返回TRUE,跳出本次循环
continue
# if add_url(url_one_con): # 若url已存在,则返回TRUE,跳出本次循环
# continue
for num_res in range(0, 3): # url若访问失败可以最多访问3次
try:
......@@ -253,20 +267,20 @@ def get_content_by_user_uid(url, sid):
# df_con = pd.DataFrame(list_all_info)
# df_con.to_excel(f'{uid}.xlsx')
for one_news_info in list_all_info: # 将每一个文章数据转换为json格式,把json文件用kafka发送出去
for num_pro in range(0, 3):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo",
json.dumps(one_news_info, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
# time.sleep(1)
# print(json.dumps(one_news_info, ensure_ascii=False))
break
except:
time.sleep(5)
print('发送kafka失败!正在重新发送!')
continue
# for one_news_info in list_all_info: # 将每一个文章数据转换为json格式,把json文件用kafka发送出去
# for num_pro in range(0, 3):
# try:
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# kafka_result = producer.send("crawlerInfo",
# json.dumps(one_news_info, ensure_ascii=False).encode('utf8'))
# print(kafka_result.get(timeout=10))
# # time.sleep(1)
# # print(json.dumps(one_news_info, ensure_ascii=False))
# break
# except:
# time.sleep(5)
# print('发送kafka失败!正在重新发送!')
# continue
# print(list_all_info[0]['title'])
return
......@@ -289,5 +303,5 @@ def consume():
if __name__ == "__main__":
# r = redis.Redis(host="localhost",port=6379)
# consume()
get_content_by_user_uid('https://weibo.com/2656274875?refer_flag=1001030103_','1571698920447193090')
get_content_by_user_uid('https://weibo.com/u/1689572847','1571698920447193090')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论