提交 1c9b2f85 作者: LiuLiYuan

新浪财经 10/13

上级 864508c6
...@@ -25,18 +25,15 @@ headers = { ...@@ -25,18 +25,15 @@ headers = {
'Cache-Control': 'no-cache', 'Cache-Control': 'no-cache',
'Pragma': 'no-cache' 'Pragma': 'no-cache'
} }
taskType = '新浪财经/天眼查' taskType = '企业动态/新浪财经'
# 获取企业信息 # 获取企业信息
def getinfomation(social_code): def getinfomation(social_code):
selectSql = f"select * from mgzqjywyh_list where state = '2' and xydm='{social_code}' " selectSql = f"select * from mgzqyjwyh_list where state = '2' and xydm='{social_code}' "
cursor.execute(selectSql) cursor.execute(selectSql)
data = cursor.fetchone() data = cursor.fetchone()
cnx.commit()
data = list(data) data = list(data)
cursor.close()
cnx.close()
return data return data
...@@ -66,13 +63,19 @@ def getDic(social_code, li): ...@@ -66,13 +63,19 @@ def getDic(social_code, li):
contentText = smart.extract_by_url(href_).text contentText = smart.extract_by_url(href_).text
# 不带标签正文 # 不带标签正文
content = smart.extract_by_url(href_).cleaned_text content = smart.extract_by_url(href_).cleaned_text
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) if content == '':
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0
except: except:
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return return 0
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
'attachmentIds': '', 'attachmentIds': '',
'author': author, 'author': author,
...@@ -110,6 +113,7 @@ def getDic(social_code, li): ...@@ -110,6 +113,7 @@ def getDic(social_code, li):
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
return 1
# 数据发送至Kafka # 数据发送至Kafka
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
...@@ -157,64 +161,73 @@ def selectUrl(url, social_code): ...@@ -157,64 +161,73 @@ def selectUrl(url, social_code):
def doJob(): def doJob():
# while True: # while True:
# social_code = '' num_ok = 0
# # 从redis中获取企业信用代码 num_error = 0
# try: # 从redis中获取企业信用代码
# data = getinfomation(social_code) # social_code = baseCore.redicPullData('NewsEnterprise:nyse_socialCode')
# com_code = data[6] social_code = 'ZZSN22080900000010'
com_code = 'AAPL'
social_code = 'ZZSN22080900000004'
log.info(f'{social_code}==={com_code}===开始采集')
start_time = time.time() start_time = time.time()
pageIndex = 1 try:
while True: data = getinfomation(social_code)
# 拼接链接 com_code = data[3]
# url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex=1&symbol=AAPL&type=1' log.info(f'{social_code}==={com_code}===开始采集')
url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex={pageIndex}&symbol={com_code}&type=1' pageIndex = 1
soup_home = getrequests(url) while True:
li_list = soup_home.select('body > div > div.xb_news > ul > li') # 拼接链接
# 有可能第一次获取的li标签列表为空 # url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex=1&symbol=AAPL&type=1'
for i in range(5): url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex={pageIndex}&symbol={com_code}&type=1'
if len(li_list) == 0: soup_home = getrequests(url)
li_list = soup_home.select('body > div > div.xb_news > ul > li') li_list = soup_home.select('body > div > div.xb_news > ul > li')
else: # 有可能第一次获取的li标签列表为空
break for i in range(5):
for li in li_list: if len(li_list) == 0:
title = li.find('a').text li_list = soup_home.select('body > div > div.xb_news > ul > li')
if title == '': else:
continue break
href = li.find('a').get('href') for li in li_list:
selects = selectUrl(href, social_code) title = li.find('a').text
if title == '':
continue
href = li.find('a').get('href')
selects = selectUrl(href, social_code)
if selects:
log.info(f'{url}==已采集过')
else:
flg = getDic(social_code, li)
if flg == 1:
num_ok += 1
else:
num_error += 1
# # 如果采集到已采集过动态,证明最新发布动态已经全部采集过
# 增量使用
if selects: if selects:
log.info(f'{url}==已采集过')
else:
getDic(social_code, li)
break
break
# # 如果采集到已采集过动态,证明最新发布动态已经全部采集过
# 增量使用
# if selects:
# break
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
for i in range(5):
if len(next) == 0:
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
else:
break break
if len(next) == 2: next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
break for i in range(5):
pageIndex += 1 if len(next) == 0:
time.sleep(2) next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
log.info(f'{social_code}==={com_code}===企业整体耗时{baseCore.getTimeCost(start_time,time.time())}') else:
# except: break
# log.info(f'==={social_code}=====获取企业信息失败====') if len(next) == 2:
# #重新塞入redis break
# baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code) if len(next) == 1:
# state = 0 text_flag = next[0].text
# takeTime = baseCore.getTimeCost(start, time.time()) if text_flag != '下页':
# baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}') break
# time.sleep(5) pageIndex += 1
time.sleep(2)
log.info(f'{social_code}==={com_code}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
except Exception as e:
print(e)
log.info(f'==={social_code}=====获取企业信息失败====')
# 重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:nyse_socialCode', social_code)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
if __name__ == "__main__": if __name__ == "__main__":
doJob() doJob()
baseCore.close()
...@@ -30,33 +30,68 @@ headers = { ...@@ -30,33 +30,68 @@ headers = {
taskType = '企业动态/新浪财经/国内' taskType = '企业动态/新浪财经/国内'
pattern = r"\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}" pattern = r"\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}"
# 获取响应页面 # 获取响应页面
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def getrequests(url): def getrequests(url):
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
req = requests.get(url, headers=headers,proxies=ip) req = requests.get(url, headers=headers, proxies=ip)
req.encoding = req.apparent_encoding req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser') soup = BeautifulSoup(req.text, 'html.parser')
return soup return soup
# 解析内容 # 页面内容解析
def getContent(href):
soup = getrequests(href)
if 'http://stock.finance' in href:
div_content = soup.find('div',class_='blk_container')
content = div_content.text
contentWithTag = str(div_content)
else:
div_content = soup.find('div', attrs={'id': 'artibody'})
div_list = div_content.find_all('div')
for div in div_list:
try:
if div.get('class')[0] != 'img_wrapper':
div.decompose()
except:
div.decompose()
script_list = div_content.find_all('script')
for script in script_list:
script.decompose()
style_list = div_content.find_all('style')
for style in style_list:
style.decompose()
img_list = div_content.find_all('img')
content = div_content.text.lstrip().strip()
contentWithTag = str(div_content)
for img in img_list:
img_src = img.get('src')
if 'https' not in img_src:
contentWithTag = contentWithTag.replace(img_src, f'https:{img_src}')
return content, contentWithTag
# 获取解析内容并发送
def getDic(social_code, title, href, pub_time): def getDic(social_code, title, href, pub_time):
start_time = time.time() start_time = time.time()
if 'http' not in href: pattern = re.compile(r'[\u4e00-\u9fa5]+')
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try: try:
# 带标签正文 content, contentWithTag = getContent(href)
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
if content == '': if content == '':
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0 return 0
matches = re.findall(pattern, content)
if len(matches) == 0:
log.error(f'{href}===页面解析乱码')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析乱码')
return 0
except: except:
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
...@@ -68,7 +103,7 @@ def getDic(social_code, title, href, pub_time): ...@@ -68,7 +103,7 @@ def getDic(social_code, title, href, pub_time):
'attachmentIds': '', 'attachmentIds': '',
'author': '', 'author': '',
'content': content, 'content': content,
'contentWithTag': contentText, 'contentWithTag': contentWithTag,
'createDate': time_now, 'createDate': time_now,
'deleteFlag': '0', 'deleteFlag': '0',
'id': '', 'id': '',
...@@ -84,12 +119,11 @@ def getDic(social_code, title, href, pub_time): ...@@ -84,12 +119,11 @@ def getDic(social_code, title, href, pub_time):
'socialCreditCode': social_code, 'socialCreditCode': social_code,
'year': pub_time[:4] 'year': pub_time[:4]
} }
# print(dic_news)
try: try:
sendKafka(dic_news, start_time) sendKafka(dic_news, start_time)
log.info(f'Kafka发送成功') log.info(f'Kafka发送成功')
try: try:
insertMysql(social_code, href) insertMysql(social_code, href, pub_time,title,content)
log.info(f'数据库保存成功') log.info(f'数据库保存成功')
except: except:
log.error(f'{href}===数据入库失败') log.error(f'{href}===数据入库失败')
...@@ -126,14 +160,17 @@ def sendKafka(dic_news, start_time): ...@@ -126,14 +160,17 @@ def sendKafka(dic_news, start_time):
# 数据保存入库,用于判重 # 数据保存入库,用于判重
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def insertMysql(social_code, link): def insertMysql(social_code, link, pub_time, title, content):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,title,content,create_time) values(%s,%s,%s,%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
list_info = [ list_info = [
social_code, social_code,
link, link,
'新浪财经', '新浪财经',
'2', '2',
pub_time,
title,
content[0:500]
] ]
cursor.execute(insert_sql, tuple(list_info)) cursor.execute(insert_sql, tuple(list_info))
cnx.commit() cnx.commit()
...@@ -175,7 +212,7 @@ def doJob(): ...@@ -175,7 +212,7 @@ def doJob():
continue continue
page = 1 page = 1
num_ok = 0 num_ok = 0
num_error =0 num_error = 0
while True: while True:
url = f'http://vip.stock.finance.sina.com.cn/corp/view/vCB_AllNewsStock.php?symbol={gpdm_}&Page={page}' url = f'http://vip.stock.finance.sina.com.cn/corp/view/vCB_AllNewsStock.php?symbol={gpdm_}&Page={page}'
soup = getrequests(url) soup = getrequests(url)
...@@ -184,7 +221,7 @@ def doJob(): ...@@ -184,7 +221,7 @@ def doJob():
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
r.rpush('NewsEnterprise:gnqy_nyse_socialCode',social_code) r.rpush('NewsEnterprise:gnqy_nyse_socialCode', social_code)
time.sleep(1800) time.sleep(1800)
break break
try: try:
...@@ -197,14 +234,16 @@ def doJob(): ...@@ -197,14 +234,16 @@ def doJob():
if title == '': if title == '':
continue continue
href = a_list[i].get('href') href = a_list[i].get('href')
selects = selectUrl(href,social_code) selects = selectUrl(href, social_code)
if selects: if selects:
log.info(f'{href}===已采集') log.info(f'{href}===已采集')
continue continue
if 'http' not in href: if 'http' not in href:
href = 'https://finance.sina.com.cn' + href href = 'https://finance.sina.com.cn' + href
pub_time = time_list[i].replace('\xa0', ' ') + ":00" pub_time = time_list[i].replace('\xa0', ' ') + ":00"
flg = getDic(social_code,title,href,pub_time) if 'https://cj.sina' not in href and 'https://t.cj.sina' not in href:
print(href)
flg = getDic(social_code, title, href, pub_time)
if flg == 0: if flg == 0:
num_error += 1 num_error += 1
else: else:
...@@ -216,7 +255,7 @@ def doJob(): ...@@ -216,7 +255,7 @@ def doJob():
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
break # break
except: except:
log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败") log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
state = 0 state = 0
...@@ -226,9 +265,11 @@ def doJob(): ...@@ -226,9 +265,11 @@ def doJob():
if '下一页' not in next_flg: if '下一页' not in next_flg:
break break
page += 1 page += 1
break # break
log.info(f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条') log.info(
f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
break
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -55,24 +55,51 @@ def getrequests(url): ...@@ -55,24 +55,51 @@ def getrequests(url):
soup = BeautifulSoup(req.text, 'html.parser') soup = BeautifulSoup(req.text, 'html.parser')
return soup return soup
# 页面内容解析
def getContent(href):
soup = getrequests(href)
div_content = soup.find('div',attrs={'id':'artibody'})
div_list = div_content.find_all('div')
for div in div_list:
try:
if div.get('class')[0] != 'img_wrapper':
div.decompose()
except:
div.decompose()
script_list = div_content.find_all('script')
for script in script_list:
script.decompose()
style_list = div_content.find_all('style')
for style in style_list:
style.decompose()
img_list = div_content.find_all('img')
content = div_content.text.lstrip().strip()
contentWithTag = str(div_content)
for img in img_list:
img_src = img.get('src')
if 'https' not in img_src:
contentWithTag = contentWithTag.replace(img_src,f'https:{img_src}')
return content,contentWithTag
# 解析内容 # 获取解析内容并发送
def getDic(social_code, title, href, pub_time): def getDic(social_code, title, href, pub_time):
start_time = time.time() start_time = time.time()
if 'http' not in href: pattern = re.compile(r'[\u4e00-\u9fa5]+')
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try: try:
# 带标签正文 content,contentWithTag = getContent(href)
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
if content == '': if content == '':
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0 return 0
matches = re.findall(pattern, content)
if len(matches) == 0:
log.error(f'{href}===页面解析乱码')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析乱码')
return 0
except: except:
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
...@@ -84,7 +111,7 @@ def getDic(social_code, title, href, pub_time): ...@@ -84,7 +111,7 @@ def getDic(social_code, title, href, pub_time):
'attachmentIds': '', 'attachmentIds': '',
'author': '', 'author': '',
'content': content, 'content': content,
'contentWithTag': contentText, 'contentWithTag': contentWithTag,
'createDate': time_now, 'createDate': time_now,
'deleteFlag': '0', 'deleteFlag': '0',
'id': '', 'id': '',
...@@ -100,23 +127,22 @@ def getDic(social_code, title, href, pub_time): ...@@ -100,23 +127,22 @@ def getDic(social_code, title, href, pub_time):
'socialCreditCode': social_code, 'socialCreditCode': social_code,
'year': pub_time[:4] 'year': pub_time[:4]
} }
# print(dic_news) try:
# try: sendKafka(dic_news, start_time)
# sendKafka(dic_news, start_time) log.info(f'Kafka发送成功')
# log.info(f'Kafka发送成功') try:
# try: insertMysql(social_code, href,pub_time)
# insertMysql(social_code, href) log.info(f'数据库保存成功')
# log.info(f'数据库保存成功') except:
# except: log.error(f'{href}===数据入库失败')
# log.error(f'{href}===数据入库失败') state = 0
# state = 0 takeTime = baseCore.getTimeCost(start_time, time.time())
# takeTime = baseCore.getTimeCost(start_time, time.time()) baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败')
# baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败') except:
# except: log.error(f'{href}===发送Kafka失败')
# log.error(f'{href}===发送Kafka失败') state = 0
# state = 0 takeTime = baseCore.getTimeCost(start_time, time.time())
# takeTime = baseCore.getTimeCost(start_time, time.time()) baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
# baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
return 1 return 1
...@@ -142,14 +168,17 @@ def sendKafka(dic_news, start_time): ...@@ -142,14 +168,17 @@ def sendKafka(dic_news, start_time):
# 数据保存入库,用于判重 # 数据保存入库,用于判重
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def insertMysql(social_code, link): def insertMysql(social_code, link,pub_time,title,content):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,title,content,create_time) values(%s,%s,%s,%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
list_info = [ list_info = [
social_code, social_code,
link, link,
'新浪财经', '新浪财经',
'2', '2',
pub_time,
title,
content[:500]
] ]
cursor.execute(insert_sql, tuple(list_info)) cursor.execute(insert_sql, tuple(list_info))
cnx.commit() cnx.commit()
...@@ -174,9 +203,9 @@ def doJob(): ...@@ -174,9 +203,9 @@ def doJob():
data = baseCore.getInfomation(social_code) data = baseCore.getInfomation(social_code)
gpdm = data[3] gpdm = data[3]
log.info(f'{social_code}==={gpdm}===开始采集') log.info(f'{social_code}==={gpdm}===开始采集')
# if gpdm == '' or not gpdm: if gpdm == '' or not gpdm:
# log.error(f'{social_code}===股票代码为空') log.error(f'{social_code}===股票代码为空')
# continue continue
gpdm_ = gpdm.split('.')[0] gpdm_ = gpdm.split('.')[0]
if len(gpdm_) != 5: if len(gpdm_) != 5:
gpdm_ = gpdm_.zfill(5) gpdm_ = gpdm_.zfill(5)
...@@ -191,7 +220,7 @@ def doJob(): ...@@ -191,7 +220,7 @@ def doJob():
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
# r.rpush('NewsEnterprise:xgqy_nyse_socialCode',social_code) r.rpush('NewsEnterprise:xgqy_nyse_socialCode',social_code)
time.sleep(1800) time.sleep(1800)
break break
next_flg = soup.find('div',class_='part02').text next_flg = soup.find('div',class_='part02').text
...@@ -212,13 +241,12 @@ def doJob(): ...@@ -212,13 +241,12 @@ def doJob():
log.info(f'{href}===已采集过') log.info(f'{href}===已采集过')
continue continue
pub_time = format_time(li.find('span').text) pub_time = format_time(li.find('span').text)
print(title)
flag = getDic(social_code,title,href,pub_time) flag = getDic(social_code,title,href,pub_time)
if flag == 1: if flag == 1:
num_ok += 1 num_ok += 1
else: else:
num_error += 1 num_error += 1
time.sleep(0.5) time.sleep(1)
except Exception as e: except Exception as e:
ee = e.__traceback__.tb_lineno ee = e.__traceback__.tb_lineno
log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}') log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
......
...@@ -10,9 +10,6 @@ import requests ...@@ -10,9 +10,6 @@ import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from retry import retry from retry import retry
from obs import ObsClient
import fitz
from urllib.parse import unquote
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
taskType = '企业公告/新浪财经/国内' taskType = '企业公告/新浪财经/国内'
...@@ -30,141 +27,41 @@ headers = { ...@@ -30,141 +27,41 @@ headers = {
} }
pattern = r"\d{4}-\d{2}-\d{2}" pattern = r"\d{4}-\d{2}-\d{2}"
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# 获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url, pdf_name, type_id, social_code):
start_time = time.time()
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'Liuliyuans',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
name = pdf_name + '.pdf'
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', f'ZJH/{now_time}/' + name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
def secrchATT(item_id, name, type_id, order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, name, type_id, order_by))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size)
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
log.info("更新完成:{}".format(Upsql))
selects = secrchATT(item_id, pdf_name, type_id, order_by)
id = selects[0]
return id
def ifInstert(social_code, pdf_url): def ifInstert(social_code, pdf_url):
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='证监会' and type='1' ''' sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='新浪财经' and type='1' '''
cursor.execute(sel_sql, (social_code, pdf_url)) cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone() selects = cursor.fetchone()
return selects return selects
@retry(tries=3, delay=1)
def sendKafka(dic_news):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time, com_name, num): def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time, com_name, num):
# 判断文件是否已经存在obs服务器中 # 判断文件是否已经存在obs服务器中
# file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc' # file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc'
soup = getrequests(pdf_url)
pdf_url = soup.find('table',attrs={'id':'allbulletin'}).find('tr',class_='gray').find('a').get('href')
now_time = time.strftime("%Y-%m") now_time = time.strftime("%Y-%m")
file_path = 'ZJH/' + now_time + '/' + pdf_name + '.pdf' file_path = 'XLCJNotice/' + now_time + '/' + pdf_name + '.pdf'
response = obsClient.getObjectMetadata('zzsn', file_path) response = baseCore.obsexist(file_path)
if response.status >= 300: if not response:
log.info('=====文件不存在obs=====')
pass
else:
log.info(f'=====文件存在obs========{file_path}')
return False return False
# 上传至华为云服务器 # 上传至华为云服务器
retData = uptoOBS(pdf_url, pdf_name, 8, social_code) retData = baseCore.uptoOBS(pdf_url, pdf_name, 8, social_code, 'XLCJNotice', taskType, start_time, 'LiuLiYuan')
# 附件插入att数据库 # 附件插入att数据库
if retData['state']: if retData['state']:
pass pass
else: else:
log.info(f'====pdf解析失败====') log.info(f'====pdf解析失败====')
return False return False
att_id = tableUpdate(retData, com_name, year, pdf_name, num) att_id = baseCore.tableUpdate(retData, com_name, year, pdf_name, num)
if att_id: if att_id:
pass pass
else: else:
...@@ -195,11 +92,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time, com_n ...@@ -195,11 +92,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time, com_n
# print(dic_news) # print(dic_news)
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) sendKafka(dic_news)
kafka_result = producer.send("researchReportTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = { dic_result = {
'success': 'ture', 'success': 'ture',
'message': '操作成功', 'message': '操作成功',
...@@ -249,6 +142,7 @@ def getrequests(url): ...@@ -249,6 +142,7 @@ def getrequests(url):
return soup return soup
# 数据库插入
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def insertMysql(social_code, link, pub_time): def insertMysql(social_code, link, pub_time):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
...@@ -265,6 +159,8 @@ def insertMysql(social_code, link, pub_time): ...@@ -265,6 +159,8 @@ def insertMysql(social_code, link, pub_time):
def doJob(): def doJob():
# while True:
# social_code = baseCore.redicPullData()
start_time = time.time() start_time = time.time()
social_code = '91440300192185379H' social_code = '91440300192185379H'
data = baseCore.getInfomation(social_code) data = baseCore.getInfomation(social_code)
...@@ -277,74 +173,84 @@ def doJob(): ...@@ -277,74 +173,84 @@ def doJob():
# continue # continue
page = 1 page = 1
num = 1 num = 1
# while True: while True:
url = f'https://vip.stock.finance.sina.com.cn/corp/view/vCB_AllBulletin.php?stockid={gpdm}&Page={page}' url = f'https://vip.stock.finance.sina.com.cn/corp/view/vCB_AllBulletin.php?stockid={gpdm}&Page={page}'
soup = getrequests(url) print(url)
# if '拒绝访问' in soup.text: soup = getrequests(url)
# log.error(f'{social_code}===ip封禁') # if '拒绝访问' in soup.text:
# state = 0 # log.error(f'{social_code}===ip封禁')
# takeTime = baseCore.getTimeCost(start_time, time.time()) # state = 0
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁') # takeTime = baseCore.getTimeCost(start_time, time.time())
# # r.rpush('NewsEnterprise:gnqy_nyse_socialCode',social_code) # baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
# time.sleep(1800) # # r.rpush('NewsEnterprise:gnqy_nyse_socialCode',social_code)
# break # time.sleep(1800)
# try: # break
div_flg = soup.find('div', class_='tagmain').text # try:
if '暂时没有数据!' in div_flg: div_flg = soup.find('div', class_='tagmain').text
log.info(f"{social_code}==={gpdm}===没有公告") if '暂时没有数据!' in div_flg:
else: log.info(f"{social_code}==={gpdm}===没有公告")
ul = soup.find('div', class_='datelist').find('ul') else:
a_list = ul.find_all('a') ul = soup.find('div', class_='datelist').find('ul')
time_list = re.findall(pattern, str(ul)) a_list = ul.find_all('a')
for i in range(len(a_list)): time_list = re.findall(pattern, str(ul))
# try: for i in range(len(a_list)):
name_pdf = a_list[i].text.lstrip().strip() # try:
if name_pdf == '': name_pdf = a_list[i].text.lstrip().strip()
continue if name_pdf == '':
href = a_list[i].get('href') continue
if 'http' not in href: href = a_list[i].get('href')
href = 'https://finance.sina.com.cn' + href if 'http' not in href:
selects = ifInstert(short_name, social_code, href) href = 'https://vip.stock.finance.sina.com.cn' + href
if selects: soup_href = getrequests(href)
log.info(f'{href}===已采集') th = soup_href.find('table',attrs={'id':'allbulletin'}).find_all('th')[0]
continue if '下载公告' in th.text:
pub_time = format_time(time_list[i]) pdf_url = th.find('a',attrs={'target':'_blank'}).get('href')
year = pub_time[:4] else:
result = GetContent(href, name_pdf, social_code, year, pub_time, start_time, com_name, num) log.error(f"{href}===没有公告下载地址")
num += 1 continue
if result: selects = ifInstert(social_code, pdf_url)
# 公告信息列表 if selects:
log.info(f'{short_name}==============解析传输操作成功') log.info(f'{pdf_url}===已采集')
state = 1 continue
takeTime = baseCore.getTimeCost(start_time, time.time()) pub_time = format_time(time_list[i])
baseCore.recordLog(social_code, taskType, state, takeTime, href, '成功') year = pub_time[:4]
print(name_pdf,pub_time,pdf_url)
# 发送kafka成功之后 再插入数据库 # result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time, com_name, num)
insert = insertMysql(social_code, href, pub_time) # num += 1
if insert: # if result:
log.info(f'===={social_code}========{name_pdf}=====插入库成功') # # 公告信息列表
pass # log.info(f'{short_name}==============解析传输操作成功')
else: # state = 1
continue # takeTime = baseCore.getTimeCost(start_time, time.time())
# time.sleep(0.5) # baseCore.recordLog(social_code, taskType, state, takeTime, href, '成功')
# except Exception as e: # try:
# ee = e.__traceback__.tb_lineno # # 发送kafka成功之后 再插入数据库
# log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}') # insertMysql(social_code, href, pub_time)
# state = 0 # log.info(f"{social_code}==={href}===数据库插入成功")
# takeTime = baseCore.getTimeCost(start_time, time.time()) # except:
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}') # log.info(f"{social_code}==={href}===数据库插入失败")
# break # continue
# except: # else:
# log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败") # continue
# state = 0 # break
# takeTime = baseCore.getTimeCost(start_time, time.time()) # time.sleep(0.5)
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败') # except Exception as e:
# next_flg = soup.select('#con02-7 > table > tr')[1].select('div')[2].text # ee = e.__traceback__.tb_lineno
# if '下一页' not in next_flg: # log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
# break # state = 0
# page += 1 # takeTime = baseCore.getTimeCost(start_time, time.time())
# break # baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
# break
# except:
# log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, url, f'获取信息列表失败')
next_flg = soup.select('#con02-7 > table > tr')[1].select('div')[2].text
if '下一页' not in next_flg:
break
page += 1
break
# log.info( # log.info(
# f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条') # f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论