提交 ec0f06d3 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

...@@ -25,18 +25,15 @@ headers = { ...@@ -25,18 +25,15 @@ headers = {
'Cache-Control': 'no-cache', 'Cache-Control': 'no-cache',
'Pragma': 'no-cache' 'Pragma': 'no-cache'
} }
taskType = '新浪财经/天眼查' taskType = '企业动态/新浪财经'
# 获取企业信息 # 获取企业信息
def getinfomation(social_code): def getinfomation(social_code):
selectSql = f"select * from mgzqjywyh_list where state = '2' and xydm='{social_code}' " selectSql = f"select * from mgzqyjwyh_list where state = '2' and xydm='{social_code}' "
cursor.execute(selectSql) cursor.execute(selectSql)
data = cursor.fetchone() data = cursor.fetchone()
cnx.commit()
data = list(data) data = list(data)
cursor.close()
cnx.close()
return data return data
...@@ -66,13 +63,19 @@ def getDic(social_code, li): ...@@ -66,13 +63,19 @@ def getDic(social_code, li):
contentText = smart.extract_by_url(href_).text contentText = smart.extract_by_url(href_).text
# 不带标签正文 # 不带标签正文
content = smart.extract_by_url(href_).cleaned_text content = smart.extract_by_url(href_).cleaned_text
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) if content == '':
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0
except: except:
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return return 0
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
'attachmentIds': '', 'attachmentIds': '',
'author': author, 'author': author,
...@@ -110,6 +113,7 @@ def getDic(social_code, li): ...@@ -110,6 +113,7 @@ def getDic(social_code, li):
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
return 1
# 数据发送至Kafka # 数据发送至Kafka
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
...@@ -157,64 +161,73 @@ def selectUrl(url, social_code): ...@@ -157,64 +161,73 @@ def selectUrl(url, social_code):
def doJob(): def doJob():
# while True: # while True:
# social_code = '' num_ok = 0
# # 从redis中获取企业信用代码 num_error = 0
# try: # 从redis中获取企业信用代码
# data = getinfomation(social_code) # social_code = baseCore.redicPullData('NewsEnterprise:nyse_socialCode')
# com_code = data[6] social_code = 'ZZSN22080900000010'
com_code = 'AAPL'
social_code = 'ZZSN22080900000004'
log.info(f'{social_code}==={com_code}===开始采集')
start_time = time.time() start_time = time.time()
pageIndex = 1 try:
while True: data = getinfomation(social_code)
# 拼接链接 com_code = data[3]
# url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex=1&symbol=AAPL&type=1' log.info(f'{social_code}==={com_code}===开始采集')
url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex={pageIndex}&symbol={com_code}&type=1' pageIndex = 1
soup_home = getrequests(url) while True:
li_list = soup_home.select('body > div > div.xb_news > ul > li') # 拼接链接
# 有可能第一次获取的li标签列表为空 # url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex=1&symbol=AAPL&type=1'
for i in range(5): url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex={pageIndex}&symbol={com_code}&type=1'
if len(li_list) == 0: soup_home = getrequests(url)
li_list = soup_home.select('body > div > div.xb_news > ul > li') li_list = soup_home.select('body > div > div.xb_news > ul > li')
else: # 有可能第一次获取的li标签列表为空
break for i in range(5):
for li in li_list: if len(li_list) == 0:
title = li.find('a').text li_list = soup_home.select('body > div > div.xb_news > ul > li')
if title == '': else:
continue break
href = li.find('a').get('href') for li in li_list:
selects = selectUrl(href, social_code) title = li.find('a').text
if title == '':
continue
href = li.find('a').get('href')
selects = selectUrl(href, social_code)
if selects:
log.info(f'{url}==已采集过')
else:
flg = getDic(social_code, li)
if flg == 1:
num_ok += 1
else:
num_error += 1
# # 如果采集到已采集过动态,证明最新发布动态已经全部采集过
# 增量使用
if selects: if selects:
log.info(f'{url}==已采集过')
else:
getDic(social_code, li)
break
break
# # 如果采集到已采集过动态,证明最新发布动态已经全部采集过
# 增量使用
# if selects:
# break
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
for i in range(5):
if len(next) == 0:
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
else:
break break
if len(next) == 2: next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
break for i in range(5):
pageIndex += 1 if len(next) == 0:
time.sleep(2) next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
log.info(f'{social_code}==={com_code}===企业整体耗时{baseCore.getTimeCost(start_time,time.time())}') else:
# except: break
# log.info(f'==={social_code}=====获取企业信息失败====') if len(next) == 2:
# #重新塞入redis break
# baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code) if len(next) == 1:
# state = 0 text_flag = next[0].text
# takeTime = baseCore.getTimeCost(start, time.time()) if text_flag != '下页':
# baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}') break
# time.sleep(5) pageIndex += 1
time.sleep(2)
log.info(f'{social_code}==={com_code}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
except Exception as e:
print(e)
log.info(f'==={social_code}=====获取企业信息失败====')
# 重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:nyse_socialCode', social_code)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
if __name__ == "__main__": if __name__ == "__main__":
doJob() doJob()
baseCore.close()
...@@ -30,33 +30,68 @@ headers = { ...@@ -30,33 +30,68 @@ headers = {
taskType = '企业动态/新浪财经/国内' taskType = '企业动态/新浪财经/国内'
pattern = r"\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}" pattern = r"\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}"
# 获取响应页面 # 获取响应页面
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def getrequests(url): def getrequests(url):
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
req = requests.get(url, headers=headers,proxies=ip) req = requests.get(url, headers=headers, proxies=ip)
req.encoding = req.apparent_encoding req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser') soup = BeautifulSoup(req.text, 'html.parser')
return soup return soup
# 解析内容 # 页面内容解析
def getContent(href):
soup = getrequests(href)
if 'http://stock.finance' in href:
div_content = soup.find('div',class_='blk_container')
content = div_content.text
contentWithTag = str(div_content)
else:
div_content = soup.find('div', attrs={'id': 'artibody'})
div_list = div_content.find_all('div')
for div in div_list:
try:
if div.get('class')[0] != 'img_wrapper':
div.decompose()
except:
div.decompose()
script_list = div_content.find_all('script')
for script in script_list:
script.decompose()
style_list = div_content.find_all('style')
for style in style_list:
style.decompose()
img_list = div_content.find_all('img')
content = div_content.text.lstrip().strip()
contentWithTag = str(div_content)
for img in img_list:
img_src = img.get('src')
if 'https' not in img_src:
contentWithTag = contentWithTag.replace(img_src, f'https:{img_src}')
return content, contentWithTag
# 获取解析内容并发送
def getDic(social_code, title, href, pub_time): def getDic(social_code, title, href, pub_time):
start_time = time.time() start_time = time.time()
if 'http' not in href: pattern = re.compile(r'[\u4e00-\u9fa5]+')
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try: try:
# 带标签正文 content, contentWithTag = getContent(href)
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
if content == '': if content == '':
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0 return 0
matches = re.findall(pattern, content)
if len(matches) == 0:
log.error(f'{href}===页面解析乱码')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析乱码')
return 0
except: except:
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
...@@ -68,7 +103,7 @@ def getDic(social_code, title, href, pub_time): ...@@ -68,7 +103,7 @@ def getDic(social_code, title, href, pub_time):
'attachmentIds': '', 'attachmentIds': '',
'author': '', 'author': '',
'content': content, 'content': content,
'contentWithTag': contentText, 'contentWithTag': contentWithTag,
'createDate': time_now, 'createDate': time_now,
'deleteFlag': '0', 'deleteFlag': '0',
'id': '', 'id': '',
...@@ -84,12 +119,11 @@ def getDic(social_code, title, href, pub_time): ...@@ -84,12 +119,11 @@ def getDic(social_code, title, href, pub_time):
'socialCreditCode': social_code, 'socialCreditCode': social_code,
'year': pub_time[:4] 'year': pub_time[:4]
} }
# print(dic_news)
try: try:
sendKafka(dic_news, start_time) sendKafka(dic_news, start_time)
log.info(f'Kafka发送成功') log.info(f'Kafka发送成功')
try: try:
insertMysql(social_code, href) insertMysql(social_code, href, pub_time,title,content)
log.info(f'数据库保存成功') log.info(f'数据库保存成功')
except: except:
log.error(f'{href}===数据入库失败') log.error(f'{href}===数据入库失败')
...@@ -126,14 +160,17 @@ def sendKafka(dic_news, start_time): ...@@ -126,14 +160,17 @@ def sendKafka(dic_news, start_time):
# 数据保存入库,用于判重 # 数据保存入库,用于判重
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def insertMysql(social_code, link): def insertMysql(social_code, link, pub_time, title, content):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,title,content,create_time) values(%s,%s,%s,%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
list_info = [ list_info = [
social_code, social_code,
link, link,
'新浪财经', '新浪财经',
'2', '2',
pub_time,
title,
content[0:500]
] ]
cursor.execute(insert_sql, tuple(list_info)) cursor.execute(insert_sql, tuple(list_info))
cnx.commit() cnx.commit()
...@@ -175,7 +212,7 @@ def doJob(): ...@@ -175,7 +212,7 @@ def doJob():
continue continue
page = 1 page = 1
num_ok = 0 num_ok = 0
num_error =0 num_error = 0
while True: while True:
url = f'http://vip.stock.finance.sina.com.cn/corp/view/vCB_AllNewsStock.php?symbol={gpdm_}&Page={page}' url = f'http://vip.stock.finance.sina.com.cn/corp/view/vCB_AllNewsStock.php?symbol={gpdm_}&Page={page}'
soup = getrequests(url) soup = getrequests(url)
...@@ -184,7 +221,7 @@ def doJob(): ...@@ -184,7 +221,7 @@ def doJob():
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
r.rpush('NewsEnterprise:gnqy_nyse_socialCode',social_code) r.rpush('NewsEnterprise:gnqy_nyse_socialCode', social_code)
time.sleep(1800) time.sleep(1800)
break break
try: try:
...@@ -197,14 +234,16 @@ def doJob(): ...@@ -197,14 +234,16 @@ def doJob():
if title == '': if title == '':
continue continue
href = a_list[i].get('href') href = a_list[i].get('href')
selects = selectUrl(href,social_code) selects = selectUrl(href, social_code)
if selects: if selects:
log.info(f'{href}===已采集') log.info(f'{href}===已采集')
continue continue
if 'http' not in href: if 'http' not in href:
href = 'https://finance.sina.com.cn' + href href = 'https://finance.sina.com.cn' + href
pub_time = time_list[i].replace('\xa0', ' ') + ":00" pub_time = time_list[i].replace('\xa0', ' ') + ":00"
flg = getDic(social_code,title,href,pub_time) if 'https://cj.sina' not in href and 'https://t.cj.sina' not in href:
print(href)
flg = getDic(social_code, title, href, pub_time)
if flg == 0: if flg == 0:
num_error += 1 num_error += 1
else: else:
...@@ -216,7 +255,7 @@ def doJob(): ...@@ -216,7 +255,7 @@ def doJob():
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'信息采集失败==原因:{ee}行 {e}')
break # break
except: except:
log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败") log.error(f"{social_code}==={gpdm}===第{page}页获取信息列表失败")
state = 0 state = 0
...@@ -226,9 +265,11 @@ def doJob(): ...@@ -226,9 +265,11 @@ def doJob():
if '下一页' not in next_flg: if '下一页' not in next_flg:
break break
page += 1 page += 1
break # break
log.info(f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条') log.info(
f'{social_code}==={gpdm}===企业整体耗时{baseCore.getTimeCost(start_time, time.time())}===成功{num_ok}条,失败{num_error}条')
break
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -55,24 +55,51 @@ def getrequests(url): ...@@ -55,24 +55,51 @@ def getrequests(url):
soup = BeautifulSoup(req.text, 'html.parser') soup = BeautifulSoup(req.text, 'html.parser')
return soup return soup
# 页面内容解析
def getContent(href):
soup = getrequests(href)
div_content = soup.find('div',attrs={'id':'artibody'})
div_list = div_content.find_all('div')
for div in div_list:
try:
if div.get('class')[0] != 'img_wrapper':
div.decompose()
except:
div.decompose()
script_list = div_content.find_all('script')
for script in script_list:
script.decompose()
style_list = div_content.find_all('style')
for style in style_list:
style.decompose()
img_list = div_content.find_all('img')
content = div_content.text.lstrip().strip()
contentWithTag = str(div_content)
for img in img_list:
img_src = img.get('src')
if 'https' not in img_src:
contentWithTag = contentWithTag.replace(img_src,f'https:{img_src}')
return content,contentWithTag
# 解析内容 # 获取解析内容并发送
def getDic(social_code, title, href, pub_time): def getDic(social_code, title, href, pub_time):
start_time = time.time() start_time = time.time()
if 'http' not in href: pattern = re.compile(r'[\u4e00-\u9fa5]+')
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try: try:
# 带标签正文 content,contentWithTag = getContent(href)
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
if content == '': if content == '':
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败') baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return 0 return 0
matches = re.findall(pattern, content)
if len(matches) == 0:
log.error(f'{href}===页面解析乱码')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析乱码')
return 0
except: except:
log.error(f'{href}===页面解析失败') log.error(f'{href}===页面解析失败')
state = 0 state = 0
...@@ -84,7 +111,7 @@ def getDic(social_code, title, href, pub_time): ...@@ -84,7 +111,7 @@ def getDic(social_code, title, href, pub_time):
'attachmentIds': '', 'attachmentIds': '',
'author': '', 'author': '',
'content': content, 'content': content,
'contentWithTag': contentText, 'contentWithTag': contentWithTag,
'createDate': time_now, 'createDate': time_now,
'deleteFlag': '0', 'deleteFlag': '0',
'id': '', 'id': '',
...@@ -100,23 +127,22 @@ def getDic(social_code, title, href, pub_time): ...@@ -100,23 +127,22 @@ def getDic(social_code, title, href, pub_time):
'socialCreditCode': social_code, 'socialCreditCode': social_code,
'year': pub_time[:4] 'year': pub_time[:4]
} }
# print(dic_news) try:
# try: sendKafka(dic_news, start_time)
# sendKafka(dic_news, start_time) log.info(f'Kafka发送成功')
# log.info(f'Kafka发送成功') try:
# try: insertMysql(social_code, href,pub_time)
# insertMysql(social_code, href) log.info(f'数据库保存成功')
# log.info(f'数据库保存成功') except:
# except: log.error(f'{href}===数据入库失败')
# log.error(f'{href}===数据入库失败') state = 0
# state = 0 takeTime = baseCore.getTimeCost(start_time, time.time())
# takeTime = baseCore.getTimeCost(start_time, time.time()) baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败')
# baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败') except:
# except: log.error(f'{href}===发送Kafka失败')
# log.error(f'{href}===发送Kafka失败') state = 0
# state = 0 takeTime = baseCore.getTimeCost(start_time, time.time())
# takeTime = baseCore.getTimeCost(start_time, time.time()) baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
# baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
return 1 return 1
...@@ -142,14 +168,17 @@ def sendKafka(dic_news, start_time): ...@@ -142,14 +168,17 @@ def sendKafka(dic_news, start_time):
# 数据保存入库,用于判重 # 数据保存入库,用于判重
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def insertMysql(social_code, link): def insertMysql(social_code, link,pub_time,title,content):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,title,content,create_time) values(%s,%s,%s,%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
list_info = [ list_info = [
social_code, social_code,
link, link,
'新浪财经', '新浪财经',
'2', '2',
pub_time,
title,
content[:500]
] ]
cursor.execute(insert_sql, tuple(list_info)) cursor.execute(insert_sql, tuple(list_info))
cnx.commit() cnx.commit()
...@@ -174,9 +203,9 @@ def doJob(): ...@@ -174,9 +203,9 @@ def doJob():
data = baseCore.getInfomation(social_code) data = baseCore.getInfomation(social_code)
gpdm = data[3] gpdm = data[3]
log.info(f'{social_code}==={gpdm}===开始采集') log.info(f'{social_code}==={gpdm}===开始采集')
# if gpdm == '' or not gpdm: if gpdm == '' or not gpdm:
# log.error(f'{social_code}===股票代码为空') log.error(f'{social_code}===股票代码为空')
# continue continue
gpdm_ = gpdm.split('.')[0] gpdm_ = gpdm.split('.')[0]
if len(gpdm_) != 5: if len(gpdm_) != 5:
gpdm_ = gpdm_.zfill(5) gpdm_ = gpdm_.zfill(5)
...@@ -191,7 +220,7 @@ def doJob(): ...@@ -191,7 +220,7 @@ def doJob():
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===ip封禁')
# r.rpush('NewsEnterprise:xgqy_nyse_socialCode',social_code) r.rpush('NewsEnterprise:xgqy_nyse_socialCode',social_code)
time.sleep(1800) time.sleep(1800)
break break
next_flg = soup.find('div',class_='part02').text next_flg = soup.find('div',class_='part02').text
...@@ -212,13 +241,12 @@ def doJob(): ...@@ -212,13 +241,12 @@ def doJob():
log.info(f'{href}===已采集过') log.info(f'{href}===已采集过')
continue continue
pub_time = format_time(li.find('span').text) pub_time = format_time(li.find('span').text)
print(title)
flag = getDic(social_code,title,href,pub_time) flag = getDic(social_code,title,href,pub_time)
if flag == 1: if flag == 1:
num_ok += 1 num_ok += 1
else: else:
num_error += 1 num_error += 1
time.sleep(0.5) time.sleep(1)
except Exception as e: except Exception as e:
ee = e.__traceback__.tb_lineno ee = e.__traceback__.tb_lineno
log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}') log.error(f'{social_code}===信息采集失败==原因:{ee}行 {e}')
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论