提交 79448081 作者: XveLingKun

5.31

上级 721c31d7
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import ast
import json import json
import re import re
import time import time
...@@ -13,7 +14,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -13,7 +14,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import sys import sys
# sys.path.append('D:\\KK\\zzsn_spider\\base') # sys.path.append('D:\\KK\\zzsn_spider\\base')
sys.path.append('D:\\kkwork\\zzsn_spider\\base') sys.path.append(r'D:\PycharmProjects\zzsn\base')
import BaseCore import BaseCore
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
...@@ -592,12 +593,13 @@ def login(): ...@@ -592,12 +593,13 @@ def login():
time.sleep(30) time.sleep(30)
return return
id_cookie = cookieinfo[0] id_cookie = cookieinfo[0]
cookie_list = json.loads(cookieinfo[1]) cookie_list = cookieinfo[1]
cookie_list = ast.literal_eval(cookie_list)
# cookie_list = json.dumps(cookieinfo[1]) # cookie_list = json.dumps(cookieinfo[1])
print(cookie_list) print(cookie_list)
# cookie_list= [{'domain': 'www.qcc.com', 'expiry': 1721815475, 'httpOnly': False, 'name': 'CNZZDATA1254842228', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': f'{cookie_["CNZZDATA1254842228"]}'}, {'domain': '.qcc.com', 'expiry': 1740650660, 'httpOnly': False, 'name': 'qcc_did', 'path': '/', 'sameSite': 'None', 'secure': True, 'value': 'bb480035-2a34-4270-9a8b-db8b7d9374b3'}, {'domain': '.qcc.com', 'expiry': 1706695474, 'httpOnly': True, 'name': 'QCCSESSID', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': 'ccf17b97219476a1faa8aaff79'}, {'domain': '.qcc.com', 'expiry': 1721815461, 'httpOnly': False, 'name': 'UM_distinctid', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': '18d3aed87f3552-01ba17134bcbe9-4c657b58-e1000-18d3aed87f4c5d'}, {'domain': 'www.qcc.com', 'expiry': 1706092459, 'httpOnly': True, 'name': 'acw_tc', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': '3d365a1c17060906591851865e848bfd116d30ed8d2ac3e144455c8ff8'}] # cookie_list= [{'domain': 'www.qcc.com', 'expiry': 1721815475, 'httpOnly': False, 'name': 'CNZZDATA1254842228', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': f'{cookie_["CNZZDATA1254842228"]}'}, {'domain': '.qcc.com', 'expiry': 1740650660, 'httpOnly': False, 'name': 'qcc_did', 'path': '/', 'sameSite': 'None', 'secure': True, 'value': 'bb480035-2a34-4270-9a8b-db8b7d9374b3'}, {'domain': '.qcc.com', 'expiry': 1706695474, 'httpOnly': True, 'name': 'QCCSESSID', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': 'ccf17b97219476a1faa8aaff79'}, {'domain': '.qcc.com', 'expiry': 1721815461, 'httpOnly': False, 'name': 'UM_distinctid', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': '18d3aed87f3552-01ba17134bcbe9-4c657b58-e1000-18d3aed87f4c5d'}, {'domain': 'www.qcc.com', 'expiry': 1706092459, 'httpOnly': True, 'name': 'acw_tc', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': '3d365a1c17060906591851865e848bfd116d30ed8d2ac3e144455c8ff8'}]
for cookie in cookie_list: for cookie in cookie_list:
cookie['expiry'] = int(cookie['expiry']) # cookie['expiry'] = int(cookie['expiry'])
# del cookie['expiry'] # del cookie['expiry']
driver.add_cookie(cookie) driver.add_cookie(cookie)
time.sleep(5) time.sleep(5)
......
...@@ -50,7 +50,7 @@ class Token(): ...@@ -50,7 +50,7 @@ class Token():
# 获取token # 获取token
def getToken(self): def getToken(self):
# cursor.execute(f"select id,cookies from QCC_token where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1") # cursor.execute(f"select id,cookies from QCC_token where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
cursor.execute(f" select id, cookies from QCC_token where id = 63") cursor.execute(f" select id, cookies from QCC_token where id= 82 ")
# rows = cursor.fetchall() # rows = cursor.fetchall()
# cnx.commit() # cnx.commit()
# if rows: # if rows:
......
# -*- coding: utf-8 -*-
"""股东信息"""
import ast
import json
import re
import time
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
import urllib3
from selenium.webdriver.support.wait import WebDriverWait
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import sys
# sys.path.append('D:\\KK\\zzsn_spider\\base')
sys.path.append(r'D:\PycharmProjects\zzsn\base')
import BaseCore
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
from classtool import Token, File, Tag
token = Token()
file = File()
tag = Tag()
from selenium import webdriver
def create_driver_():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
from selenium.webdriver.chrome.service import Service
def create_driver():
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chrome_options.add_argument("--disable-javascript")
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver
def login():
driver = create_driver()
url = 'https://www.qcc.com'
driver.get(url)
driver.maximize_window()
# time.sleep(10)
# cookie_list = driver.get_cookies()
cookieinfo = token.getToken()
if cookieinfo:
pass
else:
log.info('==========已无cookies==========')
time.sleep(30)
return
id_cookie = cookieinfo[0]
cookie_list = cookieinfo[1]
cookie_list = ast.literal_eval(cookie_list)
# cookie_list = json.dumps(cookieinfo[1])
print(cookie_list)
# cookie_list= [{'domain': 'www.qcc.com', 'expiry': 1721815475, 'httpOnly': False, 'name': 'CNZZDATA1254842228', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': f'{cookie_["CNZZDATA1254842228"]}'}, {'domain': '.qcc.com', 'expiry': 1740650660, 'httpOnly': False, 'name': 'qcc_did', 'path': '/', 'sameSite': 'None', 'secure': True, 'value': 'bb480035-2a34-4270-9a8b-db8b7d9374b3'}, {'domain': '.qcc.com', 'expiry': 1706695474, 'httpOnly': True, 'name': 'QCCSESSID', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': 'ccf17b97219476a1faa8aaff79'}, {'domain': '.qcc.com', 'expiry': 1721815461, 'httpOnly': False, 'name': 'UM_distinctid', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': '18d3aed87f3552-01ba17134bcbe9-4c657b58-e1000-18d3aed87f4c5d'}, {'domain': 'www.qcc.com', 'expiry': 1706092459, 'httpOnly': True, 'name': 'acw_tc', 'path': '/', 'sameSite': 'Lax', 'secure': False, 'value': '3d365a1c17060906591851865e848bfd116d30ed8d2ac3e144455c8ff8'}]
for cookie in cookie_list:
# cookie['expiry'] = int(cookie['expiry'])
del cookie['expiry']
driver.add_cookie(cookie)
time.sleep(5)
driver.refresh()
url_test = 'https://www.qcc.com/firm/a5f5bb3776867b3e273cd034d6fb4baa.html'
driver.get(url_test)
# driver.get('https://www.qcc.com/')
time.sleep(3)
return driver, id_cookie, cookie_list[0]
import pymongo
# 连接数据库
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'股东信息']
# 从数据库存储
def send2db(dic_info):
db_storage.insert_one(dic_info)
pass
if __name__ == '__main__':
taskType = '基本信息/企查查'
# driver, id_cookie, cookie_ = login()
# print(type(cookie_))
while True:
# nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
# file_name = f'./data/国内企业基本信息采集情况.xlsx'
# print(file_name)
# file.createFile(file_name)
# start_time = time.time()
# # 获取企业信息
# # company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
# company_field = '91310000MAD51BU44D||上海交易集团有限公司|||||||||||||1|中国内地|||||||'
# if company_field == 'end':
# # 本轮处理完毕,需要发送邮件,并且进入下一轮
# baseCore.sendEmail(file_name)
# time.sleep(20)
# file.deleteFile(file_name)
# continue
#
# if company_field == '' or company_field is None:
# # 本轮结束后没有新增的企业要采集
# file.deleteFile(file_name)
# flag = True
# while flag:
# log.info('--------已没有数据---------')
# time.sleep(30)
# if not baseCore.check_mysql_conn(cnx_):
# # 144数据库
# cnx_ = baseCore.cnx
# cursor_ = cnx_.cursor()
# log.info('===11数据库重新连接成功===')
# company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
# if company_field:
# flag = False
# log.info("-----已添加数据------")
# baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode',company_field)
# continue
# continue
# 拼接链接
#https://www.qcc.com/firm/3bc5dc687133ec9d6efdcd5e8e02ad4d.html
url = 'https://www.qcc.com/firm/3bc5dc687133ec9d6efdcd5e8e02ad4d.html'
# headers = {
# 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
# 'Accept-Encoding': 'gzip, deflate, br',
# 'Accept-Language': 'zh-CN,zh;q=0.9',
# 'Connection': 'keep-alive',
# # 'Cookie': 'qcc_did=046d99c9-566e-4046-9094-689901b79748; UM_distinctid=18aac5b8c21810-046f8431aecf58-26031f51-1fa400-18aac5b8c22efd; CNZZDATA1254842228=109635008-1695108795-https%253A%252F%252Fwww.qcc.com%252F%7C1695113473; _uab_collina=169935323766710839405007; acw_tc=db9062a717000200596487102e63dac7bed6aad2a049361c973816fabf; QCCSESSID=3c95642bd6445b7681c8fc6411',
# 'Cookie': f'qcc_did={cookie_["qcc_did"]}; acw_tc={cookie_["acw_tc"]}; QCCSESSID={cookie_["QCCSESSID"]}',
# 'Host': 'www.qcc.com',
# 'Referer': 'https://www.qcc.com/',
# 'Sec-Ch-Ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
# 'Sec-Ch-Ua-Mobile': '?0',
# 'Sec-Ch-Ua-Platform': '"Windows"',
# 'Sec-Fetch-Dest': 'document',
# 'Sec-Fetch-Mode': 'navigate',
# 'Sec-Fetch-Site': 'same-origin',
# 'Sec-Fetch-User': '?1',
# 'Upgrade-Insecure-Requests': '1',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
# }
for tag in script_tag:
if 'window.__INITIAL_STATE__ =' in tag.text:
true_text = tag.text
print(true_text)
break
else:
continue
import re
scrip = re.findall('__INITIAL_STATE__ =(.*);', true_text)[0].strip()
reqJson = json.loads(scrip)
print(reqJson)
# try:
# gudong_tag = soup.find('div', class_="tablist tablist-ipopartner")
# except:
# print("没有股东信息")
# gudong_tag = ''
# if gudong_tag:
# table = gudong_tag.find('div', class_='app-tree-table')
# tr_tag = table.find_all('tr')[1]
# print(tr_tag)
# tr_tag = BeautifulSoup("""""", '') """
# <tr><td class="tx" rowspan="1">
# 1
# </td> <td class="left first-td has-son-td" colspan="2"><div class="td-coy ipo-partner-app-tdcoy"><span class="headimg"><img class="img-logo app-auto-logo" src="https://image.qcc.com/logo/97da01cb2e03145138747afc9babf8a2.jpg?x-oss-process=style/logo_200" style="width: 40px; height: 40px;"/></span> <span class="cont"><span class="upside-line"><span class="name"><a href="https://www.qcc.com/firm/97da01cb2e03145138747afc9babf8a2.html" target="_blank">中国东方航空集团有限公司</a></span> <span class="tail-tag"><span class="icon-link ntag-v2 app-war-link"><i aria-label="图标: icon-icon_guquanjiegou" class="war-icon qccdicon qccdicon-icon-icon_guquanjiegou aicon aicon-guquanjiegou"><svg aria-hidden="true" class="" fill="currentColor" focusable="false" height="1em" width="1em"><use xlink:href="#icon-icon_guquanjiegou"></use></svg></i> <!-- --></span></span></span> <div class="app-tdcoy-tags app-tags margin-type-default"> <span class="ntag text-primary text-brand-blue"><span>央企</span></span><span class="ntag text-primary text-vip-gold"><span>大股东</span></span> <!-- --></div> <!-- --> </span> <!-- --> </div> <div class="show-son-btn"><i aria-label="图标: icon-icon_jiahao" class="tianjia qccdicon qccdicon-icon-icon_jiahao aicon aicon-jiahao"><svg aria-hidden="true" class="" fill="currentColor" focusable="false" height="1em" width="1em"><use xlink:href="#icon-icon_jiahao"></use></svg></i></div></td><td class="center"><span>流通A股,限售流通A股</span> <!-- --></td><td class="right"><span class="no-wrap"><span>39.57%</span><!-- --></span> <!-- --></td><td class=""><div><span class="app-data-mask"><span class="vip-mask"></span></span></div> <!-- --></td><td class=""><span>41.9674%</span><span class="icon-link ntag-v2 app-war-link"><i aria-label="图标: icon-icon_guquanlian" class="war-icon qccdicon qccdicon-icon-icon_guquanlian aicon aicon-guquanlian"><svg aria-hidden="true" class="" fill="currentColor" focusable="false" height="1em" width="1em"><use xlink:href="#icon-icon_guquanlian"></use></svg></i> <!-- --></span> <!-- --></td><td class="left"><span>-</span> <!-- --></td></tr>
#
# """
# td_list = tr_tag.find_all('td')
# q = td_list[0].text()
# if q == '1':
# pass
# else:
# print('error')
#
# gudong_name = td_list[1].find('span', class_='name').text
# gudong_pro = td_list[1].find_all('span', class_='ntag')
"""模拟扫码登录""" """模拟扫码登录"""
import datetime
import json import json
import time import time
...@@ -20,14 +19,10 @@ baseCore = BaseCore() ...@@ -20,14 +19,10 @@ baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
from selenium.webdriver.chrome.service import Service
db_storageInsert = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN[ def createDriver_():
'企查查登录信息'] path = r'D:\soft\msedgedriver.exe'
# path = r'F:\spider\117\msedgedriver.exe'
def createDriver():
# path = r'D:\soft\msedgedriver.exe'
path = r'F:\spider\117\msedgedriver.exe'
options = { options = {
"browserName": "MicrosoftEdge", "browserName": "MicrosoftEdge",
...@@ -42,11 +37,29 @@ def createDriver(): ...@@ -42,11 +37,29 @@ def createDriver():
session = webdriver.Edge(executable_path=path, capabilities=options) session = webdriver.Edge(executable_path=path, capabilities=options)
return session return session
def createDriver():
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
# chrome_options.add_argument("--disable-javascript")
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver
def flushAndGetToken(): def flushAndGetToken():
log.info('======刷新浏览器=====') log.info('======刷新浏览器=====')
browser.refresh() browser.refresh()
cookie_list = browser.get_cookies() cookie_list = browser.get_cookies()
cookies = {}
print(cookie_list)
# for cookie in cookie_list:
# cookies[cookie['name']] = cookie['value']
# print(cookies)
# return cookies
print(type(cookie_list))
return cookie_list return cookie_list
...@@ -86,3 +99,8 @@ if __name__ == "__main__": ...@@ -86,3 +99,8 @@ if __name__ == "__main__":
if flg == 'N' or flg == 'n': if flg == 'N' or flg == 'n':
break break
baseCore.close() baseCore.close()
import pandas as pd # https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holderV2?_=1716444648296
# from pandas import DataFrame as df
import pymysql
import redis
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
with cnx.cursor() as cursor:
select = """select relationName, relationId from klb_company"""
cursor.execute(select)
results = cursor.fetchall()
for result in results:
name = result[0]
xydm = result[1]
item = f'{name}|{xydm}'
r.rpush('SousuoBaidu:companyname', cell_value)
t = str(int(time.time()) * 1000)
url = f'https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holderV2?_=1716444648296'
payload = {"gid":f"{tycid}", "pageSize":10, "pageNum":1, "sortField":"", "sortType":"-100", "historyType":1}
ip = get_proxy()[random.randint(0, 3)]
res = requests.post(url, headers=headers, data=payload, proxies=ip, timeout=10)
json_info = res.json()
holder_info = json_info['data']['result'][0]
shareHolderName = holder_info['shareHolderName']
percent = holder_info['percent']
\ No newline at end of file
...@@ -6,6 +6,7 @@ import time ...@@ -6,6 +6,7 @@ import time
import pymysql import pymysql
import requests import requests
from retry import retry
sys.path.append('D:\\PycharmProjects\\zzsn\\base') sys.path.append('D:\\PycharmProjects\\zzsn\\base')
import BaseCore import BaseCore
...@@ -14,16 +15,45 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -14,16 +15,45 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.adapters.DEFAULT_RETRIES = 5 requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
# headers = {
# 'Accept': 'application/json, text/plain, */*',
# 'Accept-Language': 'zh-CN,zh;q=0.9',
# 'Connection': 'keep-alive',
# 'Content-Length': '32',
# 'Content-Type': 'application/json',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
# 'version': 'TYC-Web'
# }
headers = { headers = {
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'version': 'TYC-Web'
}
header = {
'Accept': 'application/json, text/plain, */*', 'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br', 'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
'Content-Length': '32', 'Content-Length': '93',
'Content-Type': 'application/json', 'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', 'Host': 'capi.tianyancha.com',
'Origin': 'https://www.tianyancha.com',
'Referer': 'https://www.tianyancha.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYzNjcxMTc0NiIsImlhdCI6MTcxNDk1Njg3MywiZXhwIjoxNzE3NTQ4ODczfQ.qMEvtETT7RS3Rhwq9idu5H2AKMxc2cjtr5bDDW6C6yOFKR-ErgDwT4SOBX9PB2LWDexAG2hNaeAvn6swr-n6VA',
'X-TYCID': 'dad485900fcc11ee8c0de34479b5b939',
'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'version': 'TYC-Web' 'version': 'TYC-Web'
} }
# cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') # cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# cursor= cnx.cursor() # cursor= cnx.cursor()
...@@ -31,36 +61,34 @@ cnx_ = baseCore.cnx ...@@ -31,36 +61,34 @@ cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
taskType = '天眼查企业id/天眼查' taskType = '天眼查企业id/天眼查'
#根据信用代码获取天眼查id 企业名字等信息 #根据信用代码获取天眼查id 企业名字等信息
@retry(tries=5, delay=3)
def getTycIdByXYDM(com_name, s): def getTycIdByXYDM(com_name, s):
retData={'state':False,'tycData':None,'reput':True} retData={'state':False, 'tycData':None, 'reput':True}
url=f"https://capi.tianyancha.com/cloud-tempest/search/suggest/v3?_={baseCore.getNowTime(3)}" url=f"https://capi.tianyancha.com/cloud-tempest/search/suggest/v3?_={baseCore.getNowTime(3)}"
# url=f"https://capi.tianyancha.com/cloud-tempest/search/suggest/v3"
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
paramJsonData = {'keyword': com_name} paramJsonData = {'keyword': com_name}
try:
# headers['User-Agent'] = baseCore.getRandomUserAgent() # response = requests.post(url=url, json=paramJsonData, headers=header, verify=False, proxies=ip)
# headers['X-AUTH-TOKEN'] = baseCore.GetTYCToken() response = s.post(url, json=paramJsonData, headers=headers)
response = s.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip) time.sleep(random.randint(3, 5))
# response = s.post(url, json=paramJsonData, headers=headers) retJsonData =json.loads(response.content.decode('utf-8'))
time.sleep(random.randint(3, 5)) if retJsonData['data'] and retJsonData['state'] == 'ok':
retJsonData =json.loads(response.content.decode('utf-8')) pass
if retJsonData['data'] and retJsonData['state'] == 'ok': else:
pass log.error(f"---{com_name}-未查询到该企业---")
else: retData['reput'] = False
log.error(f"---{com_name}-未查询到该企业---") return retData
retData['reput'] = False matchType = retJsonData['data'][0]['matchType']
return retData if matchType == '信用代码匹配' or matchType == '公司名称匹配':
matchType=retJsonData['data'][0]['matchType'] retData['state'] = True
if matchType =='公司名称匹配': retData['tycData'] = retJsonData['data'][0]
retData['state'] = True response.close()
retData['tycData'] = retJsonData['data'][0] return retData
response.close() else:
return retData log.error(f"{com_name}------{retJsonData}")
else: response.close()
log.error(f"{com_name}------{retJsonData}")
response.close()
return retData
except Exception as e:
log.error(f"---{com_name}--{e}---")
return retData return retData
......
# 根据信用代码获取天眼查id
import json
import random
import sys
import time
import pymysql
import requests
sys.path.append('D:\\PycharmProjects\\zzsn\\base')
import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Content-Length': '32',
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'version': 'TYC-Web'
}
# cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# cursor= cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
taskType = '天眼查企业id/天眼查'
#根据信用代码获取天眼查id 企业名字等信息
def getTycIdByXYDM(com_name, s):
retData={'state':False,'tycData':None,'reput':True}
url=f"https://capi.tianyancha.com/cloud-tempest/search/suggest/v3?_={baseCore.getNowTime(3)}"
ip = baseCore.get_proxy()
paramJsonData = {'keyword': com_name}
try:
# headers['User-Agent'] = baseCore.getRandomUserAgent()
# headers['X-AUTH-TOKEN'] = baseCore.GetTYCToken()
response = s.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip)
# response = s.post(url, json=paramJsonData, headers=headers)
time.sleep(random.randint(3, 5))
retJsonData =json.loads(response.content.decode('utf-8'))
if retJsonData['data'] and retJsonData['state'] == 'ok':
pass
else:
log.error(f"---{com_name}-未查询到该企业---")
retData['reput'] = False
return retData
matchType=retJsonData['data'][0]['matchType']
if matchType =='公司名称匹配':
retData['state'] = True
retData['tycData'] = retJsonData['data'][0]
response.close()
return retData
else:
log.error(f"{com_name}------{retJsonData}")
response.close()
return retData
except Exception as e:
log.error(f"---{com_name}--{e}---")
return retData
# 更新天眼查企业基本信息
def updateTycInfo():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
# social_code = '9111000066990444XF'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
continue
start = time.time()
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
# 数据重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
xydm = data[2]
tycid = data[11]
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(xydm)
if retData['tycData'] and retData['reput']:
tycid = retData['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
elif not retData['tycData'] and retData['reput']:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
elif not retData['reput'] and not retData['tycData']:
continue
except Exception as e:
log.error(e)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
if __name__ == '__main__':
updateTycInfo()
\ No newline at end of file
import json
import json
import threading
import time
import uuid
import redis
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
# 使用连接池
# cnx_ = baseCore.pool_11.connection()
# cursor_ = cnx_.cursor()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pathType = 'QYNotice/'
taskType = '企业研报/东方财富网'
pool = redis.ConnectionPool(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name,pnum):
body = {
"_source": ["attachmentIds","createDate","sourceAddress","labels.relationId","title","year","publishDate","createDate"],
"query": {
"bool": {
"must": [
{
"term": {
"type.keyword": {
"value": "1"
}
}
},
{
"term": {
"origin.keyword": {
"value": "雪球网"
}
}
},
{
"range": {
"createDate": {
"gte": "2024-05-25T00:00:00"
}
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 200,
"from": pnum
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.title',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.origin'
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
# log.info(result)
return result
def main(page, p, esMethod):
redis_conn = redis.Redis(connection_pool=pool)
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# return
try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
return
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist:
id = mms['_id']
redis_conn.lrem(f'NianbaoOT:id', 0, id)
redis_conn.lpush(f'NianbaoOT:id', id)
def run_threads(num_threads,esMethod,j):
threads = []
for i in range(num_threads):
page = j + i + 1
p = j + i * 200
thread = threading.Thread(target=main, args=(page, p, esMethod))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
j = 0
for i in range(10):
esMethod = EsMethod()
# result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# break
start = time.time()
num_threads = 5
run_threads(num_threads, esMethod, j)
j += 1000
log.info(f'5线程 每个处理200条数据 总耗时{time.time() - start}秒')
\ No newline at end of file
import json
import json
import threading
import time
import uuid
import redis
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
# 使用连接池
# cnx_ = baseCore.pool_11.connection()
# cursor_ = cnx_.cursor()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pathType = 'QYNotice/'
taskType = '企业研报/东方财富网'
pool = redis.ConnectionPool(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name,pnum):
body = {
"_source": ["attachmentIds", "createDate", "origin", "labels.relationId", "title", "year",
"publishDate"],
"query": {
"bool": {
"must": [
{
"term": {
"type.keyword": {
"value": "1"
}
}
},
{
"term": {
"year.keyword": {
"value": "2023"
}
}
}
],
"must_not": [
{
"term": {
"origin.keyword": {
"value": "SEC美国证券交易委员会"
}
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 200,
"from": pnum
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.labels.relationId',
'hits.hits._source.year',
'hits.hits._source.createDate',
'hits.hits._source.origin'
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
# log.info(result)
return result
def main(page, p, esMethod):
redis_conn = redis.Redis(connection_pool=pool)
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# return
try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
return
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist:
id = mms['_id']
year = mms['_source']['year']
socialcode = mms['_source']['labels'][0]['relationId']
origin = mms['_source']['origin']
item = socialcode + "|" + year + "|" + origin
log.info(f'{id}--{year}--{origin}--{socialcode}---')
redis_conn.lrem(f'Nianbao:id', 0, item)
redis_conn.lpush(f'Nianbao:id', item)
def run_threads(num_threads,esMethod,j):
threads = []
for i in range(num_threads):
page = j + i + 1
p = j + i * 200
thread = threading.Thread(target=main, args=(page, p, esMethod))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
j = 0
for i in range(10):
esMethod = EsMethod()
# result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# break
start = time.time()
num_threads = 5
run_threads(num_threads, esMethod, j)
j += 1000
log.info(f'5线程 每个处理200条数据 总耗时{time.time() - start}秒')
\ No newline at end of file
import sys
import sys
sys.path.append(r'D:\PycharmProjects\zzsn\base')
import BaseCore
baseCore = BaseCore.BaseCore()
import pandas as pd
r = baseCore.r
key = 'Nianbao:id'
df = pd.read_excel(r'D:\kkwork\企业数据\企业年报.xlsx')
def mian(key):
while True:
info = baseCore.redicPullData(key)
# info = '91330281784320546U|'
if info == None:
break
else:
pass
r.lpush('Nianbao:id_2', info)
social_code = info.split('|')[0]
# if df.loc[df['信用代码'] == social_code].astype(str).iloc[0]:
try:
row = df.loc[df['信用代码'] == social_code].astype(str).iloc[0]
# 现在我们想要在这一行中追加一个新列,例如'新列名',值为'新值'
new_column_name = '系统中是否有年报'
new_value = '2023'
# 在DataFrame中追加新列
df.loc[df['信用代码'] == social_code, new_column_name] = new_value
except:
continue
# break
df.to_excel(r'D:\kkwork\企业数据\企业年报.xlsx', index=False)
print('完成')
if __name__ == '__main__':
mian(key)
import json import json
...@@ -338,8 +338,8 @@ if __name__ == '__main__': ...@@ -338,8 +338,8 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:zjh_socialCode') # social_code = baseCore.redicPullData('AnnualEnterprise:zjh_socialCode')
# social_code = '91100000100003962T' social_code = '91340000719975888H'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -366,6 +366,7 @@ if __name__ == '__main__': ...@@ -366,6 +366,7 @@ if __name__ == '__main__':
count += 1 count += 1
runType = 'AnnualReportCount' runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
break
cnx.close() cnx.close()
cursor_.close() cursor_.close()
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -12,7 +12,7 @@ from datetime import datetime ...@@ -12,7 +12,7 @@ from datetime import datetime
from kafka import KafkaProducer from kafka import KafkaProducer
import sys import sys
sys.path.append('D:\\kkwork\\zzsn_spider\\base') sys.path.append(r'D:\PycharmProjects\zzsn\base')
import BaseCore import BaseCore
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
import requests, re, time, pymysql, fitz import requests, re, time, pymysql, fitz
...@@ -175,7 +175,7 @@ def spider_annual_report(dict_info,num): ...@@ -175,7 +175,7 @@ def spider_annual_report(dict_info,num):
selects = cursor.fetchone() selects = cursor.fetchone()
if selects: if selects:
log.info(f'com_name:{com_name}、{year}已存在') log.info(f'com_name:{com_name}、{year}已存在')
continue return
else: else:
#上传文件至obs服务器 #上传文件至obs服务器
retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time,'XueLingKun') retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time,'XueLingKun')
...@@ -264,8 +264,8 @@ if __name__ == '__main__': ...@@ -264,8 +264,8 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode') # social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode22')
# social_code = '91440300192176077R' social_code = '91440200555570170B'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
if not baseCore.check_mysql_conn(cnx): if not baseCore.check_mysql_conn(cnx):
......
...@@ -19,7 +19,7 @@ from tempfile import NamedTemporaryFile ...@@ -19,7 +19,7 @@ from tempfile import NamedTemporaryFile
from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
sys.path.append('D:\\kkwork\\zzsn_spider\\base') sys.path.append('D:\\PycharmProjects\\zzsn\\base')
import BaseCore import BaseCore
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
...@@ -122,7 +122,7 @@ def zzcx(): ...@@ -122,7 +122,7 @@ def zzcx():
#todo:将图片塞进去 新建一个new_tag #todo:将图片塞进去 新建一个new_tag
append_tag = png_.find_element(By.XPATH, './/div/div[1]') append_tag = png_.find_element(By.XPATH, './/div/div[1]')
driver.execute_script( driver.execute_script(
"var newElement = document.createElement('img'); newElement.src = 'http://zzsn.luyuen.com" + path + "'; arguments[0].insertBefore(newElement, arguments[0].firstChild);", "var newElement = document.createElement('img'); newElement.src = 'http://obs.ciglobal.cn" + path + "'; arguments[0].insertBefore(newElement, arguments[0].firstChild);",
append_tag) append_tag)
os.remove(file_path) os.remove(file_path)
except: except:
...@@ -153,7 +153,7 @@ def zzcx(): ...@@ -153,7 +153,7 @@ def zzcx():
# todo:将图片塞进去 新建一个new_tag # todo:将图片塞进去 新建一个new_tag
# append_tag = u_png.find_element(By.XPATH, './/div') # append_tag = u_png.find_element(By.XPATH, './/div')
driver.execute_script( driver.execute_script(
"var newElement = document.createElement('img'); newElement.src = 'http://zzsn.luyuen.com" + path + "'; arguments[0].insertBefore(newElement, arguments[0].firstChild);", "var newElement = document.createElement('img'); newElement.src = 'http://obs.ciglobal.cn" + path + "'; arguments[0].insertBefore(newElement, arguments[0].firstChild);",
u_png) u_png)
os.remove(file_path) os.remove(file_path)
...@@ -182,13 +182,13 @@ def zzcx(): ...@@ -182,13 +182,13 @@ def zzcx():
# todo:将图片塞进去 新建一个new_tag # todo:将图片塞进去 新建一个new_tag
driver.execute_script( driver.execute_script(
"var newElement = document.createElement('img'); newElement.src = 'http://zzsn.luyuen.com" + path + "'; newElement.style.width = '50%'; newElement.style.position = 'relative'; newElement.style.float = 'left'; arguments[0].insertBefore(newElement, arguments[0].firstChild);", "var newElement = document.createElement('img'); newElement.src = 'http://obs.ciglobal.cn" + path + "'; newElement.style.width = '50%'; newElement.style.position = 'relative'; newElement.style.float = 'left'; arguments[0].insertBefore(newElement, arguments[0].firstChild);",
line_bar) line_bar)
# #todo:创建清晰的图片标签 # #todo:创建清晰的图片标签
# driver.execute_script(f""" # driver.execute_script(f"""
# var img = new Image(); # var img = new Image();
# img.src = "http://zzsn.luyuen.com{path}"; // 替换为你的图片路径 # img.src = "http://obs.ciglobal.cn{path}"; // 替换为你的图片路径
# img.onload = function() {{ # img.onload = function() {{
# var canvas = document.createElement("canvas"); # var canvas = document.createElement("canvas");
# canvas.width = img.width; # canvas.width = img.width;
...@@ -243,5 +243,5 @@ def zzcx(): ...@@ -243,5 +243,5 @@ def zzcx():
if __name__ == "__main__": if __name__ == "__main__":
pathType = 'PhotoDingzhi/' pathType = 'PhotoDingzhi/'
r = redis.Redis(host='114.115.236.206', port=6379, password='clbzzsn', db=5) r = redis.Redis(host='114.116.90.53', port=6380, password='clbzzsn', db=5)
zzcx() zzcx()
\ No newline at end of file
# 核心工具包
import os
import random
import socket
import sys
import time
import langid
import logbook
import logbook.more
import zhconv
class BaseCore:
# 计算耗时
def getTimeCost(self, start, end):
seconds = int(end - start)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
if (h > 0):
return "%d小时%d分钟%d秒" % (h, m, s)
elif (m > 0):
return "%d分钟%d秒" % (m, s)
elif (seconds > 0):
return "%d秒" % (s)
else:
ms = int((end - start) * 1000)
return "%d毫秒" % (ms)
# 当前时间格式化
# 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S
# 2 : 010101120000 %y%m%d%H%M%S
# 时间戳 3:1690179526555 精确到秒
def getNowTime(self, type):
now_time = ""
if type == 1:
now_time = time.strftime("%Y-%m-%d %H:%M:%S")
if type == 2:
now_time = time.strftime("%y%m%d%H%M%S")
if type == 3:
now_time = int(time.time() * 1000)
return now_time
# 日志格式
def logFormate(self, record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间
level=record.level_name, # 日志等级
filename=os.path.split(record.filename)[-1], # 文件名
func_name=record.func_name, # 函数名
lineno=record.lineno, # 行号
msg=record.message # 日志内容
)
return formate
# 获取logger
def getLogger(self, fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log"
if not os.path.exists(dirname):
os.mkdir(dirname)
logbook.set_datetime_format('local')
logger = logbook.Logger(filename)
logger.handlers = []
if fileLogFlag: # 日志输出到文件
logFile = logbook.TimedRotatingFileHandler(os.path.join(dirname, filename), date_format='%Y-%m-%d',
bubble=True, encoding='utf-8')
logFile.formatter = self.logFormate
logger.handlers.append(logFile)
if stdOutFlag: # 日志打印到屏幕
logStd = logbook.more.ColorizedStderrHandler(bubble=True)
logStd.formatter = self.logFormate
logger.handlers.append(logStd)
return logger
# 获取随机的userAgent
def getRandomUserAgent(self):
return random.choice(self.__USER_AGENT_LIST)
# 字符串截取
def getSubStr(self, str, beginStr, endStr):
if beginStr == '':
pass
else:
begin = str.rfind(beginStr)
if begin == -1:
begin = 0
str = str[begin:]
if endStr == '':
pass
else:
end = str.rfind(endStr)
if end == -1:
pass
else:
str = str[0:end + 1]
return str
# 繁体字转简体字
def hant_2_hans(self, hant_str: str):
'''
Function: 将 hant_str 由繁体转化为简体
'''
return zhconv.convert(hant_str, 'zh-hans')
# 判断字符串里是否含数字
def str_have_num(self, str_num):
panduan = False
for str_1 in str_num:
ppp = str_1.isdigit()
if ppp:
panduan = ppp
return panduan
# 获得脚本进程PID
def getPID(self):
PID = os.getpid()
return PID
# 获取本机IP
def getIP(self):
IP = socket.gethostbyname(socket.gethostname())
return IP
# 检测语言
def detect_language(self, text):
# 使用langid.py判断文本的语言
result = langid.classify(text)
if result == '':
return 'cn'
if result[0] == '':
return 'cn'
return result[0]
[mysql]
host=114.115.159.144
username=caiji
password=zzsn9988
database=jx_enterprise
[task]
# 每月的几号开始采集
# 格式:1,2,3/1
#每月的1、2、3号开始执行 或 每月的1号执行
day=15
# 几点开始执行
# 格式:12,13/12
# 12、13各执行一次 或 12点执行一次
hour=12
# 几分开始执行
#格式:0,30/0
# 整点、半点各执行一次 或 整点执行一次
minute=0
[interface]
# 设置接口的端口号
port=8000
\ No newline at end of file
import configparser
import datetime
import hashlib
import json
import time
import uuid
from urllib.parse import urlencode
import pandas as pd
import pymysql
import requests
from DBUtils.PooledDB import PooledDB
from apscheduler.schedulers.blocking import BlockingScheduler
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
# 接口信息
appkey = '84959d1d-6afa-4c57-9fea-b75d5599f761'
secretKey = '2aebca76-2def-4f8e-961a-436eeb0828fd'
timeStamp = int(time.time()) * 1000
import configparser
import datetime
class ToMysql():
def __init__(self):
self.now = datetime.datetime.now().strftime('%Y-%m-%d')
self.config = configparser.ConfigParser()
self.config.read('config.ini', encoding='utf-8')
# 设置请求头信息
self.headers = {
'Auth-version': '2.0', # 指定接口验证版本
'appkey': appkey,
'timestamp': str(timeStamp),
'sign': self.md5Encode(appkey + str(timeStamp) + secretKey),
'Connection': 'keep-alive'
}
def md5Encode(self, srcStr):
'''计算字符串的md5值'''
m = hashlib.md5()
m.update(srcStr.encode('utf-8'))
return m.hexdigest()
def mysqlConnection(self):
self.pool = PooledDB(
creator=pymysql,
maxconnections=5,
mincached=2,
maxcached=5,
blocking=True,
host=self.config.get('mysql', 'host'),
port=3306,
user=self.config.get('mysql', 'username'),
password=self.config.get('mysql', 'password'),
database=self.config.get('mysql', 'database'),
charset='utf8mb4'
)
self.cnx = self.pool.connection()
self.cursor = self.cnx.cursor()
# 数据库断开连接
def mysqlClose(self):
self.cursor.close()
self.cnx.close()
self.pool.close()
# 调用接口,获取数据
def dataRequest(self, url, company):
# 调用接口
response = requests.get(url, headers=self.headers)
dataJson = response.json()
log.info(dataJson)
if dataJson['status'] == '201':
log.info(f'{company}===无结果')
result = None
elif dataJson['status'] == '207':
log.error(f'{company}===查询错误')
result = None
elif dataJson['status'] == '208':
log.error(f'{company}===参数名错误或参数为空')
result = None
elif dataJson['status'] == '216':
log.error(f'{company}===调用次数超过账户额度限制')
result = None
elif dataJson['status'] == '102':
log.error(f'{company}===账户余额不足')
result = None
elif dataJson['status'] != '200':
log.error(f'{company}===出错状态码{dataJson["status"]}')
result = None
else:
result = dataJson['data']
return result
def getOrgId(self, company):
sql = f'select ORG_ID from organization_id where ORG_NAME = "{company}"'
self.cursor.execute(sql)
try:
orgId = self.cursor.fetchone()[0]
self.cnx.commit()
return orgId
except:
orgId = str(uuid.uuid4())
sqlInsert = f'insert into organization_id(ORG_ID,ORG_NAME) values (%s,%s)'
self.cursor.execute(sqlInsert, (orgId, company))
self.cnx.commit()
return orgId
# 工商照面 1.41
def ORGANIZAION(self):
errorNameList = []
log.info('开始采集企业基本信息===接口1.41')
baseUrl = 'https://api.qixin.com/APIService/enterprise/getBasicInfo'
df = pd.read_excel('./监管企业名单_.xlsx', sheet_name='Sheet1')
companyList = df['单位名称']
for company in companyList:
company = company.strip()
sqlSelect = f"select * from organization_ where NAME='{company}' and CREATE_DATE='{self.now}'"
self.cursor.execute(sqlSelect)
self.cnx.commit()
is_insert = self.cursor.fetchone()
if is_insert:
log.info(f'{company}===已入库')
continue
# 请求参数
urlParams = {
'keyword': company
}
# 构造url
url = '{}?{}'.format(baseUrl, urlencode(urlParams))
companyData = self.dataRequest(url, company)
if not companyData:
errorNameList.append(company)
continue
_id = self.getOrgId(company)
url = f"https://www.qixin.com/company/{companyData['id']}"
name = companyData['name']
format_name = companyData['format_name']
econKind = companyData['econKind']
econKindCode = companyData['econKindCode']
registCapi = companyData['registCapi']
currency_unit = companyData['currency_unit']
type_new = companyData['type_new']
historyNames = companyData['historyNames']
historyNames_ = ''
for historyNames in historyNames:
historyNames_ += f'{historyNames},'
historyNames_ = historyNames_.rstrip(',')
address = companyData['address']
regNo = companyData['regNo']
scope = companyData['scope']
termStart = companyData['termStart']
termEnd = companyData['termEnd']
belongOrg = companyData['belongOrg']
operName = companyData['operName']
title = companyData['title']
startDate = companyData['startDate']
endDate = companyData['endDate']
checkDate = companyData['checkDate']
status = companyData['status']
new_status = companyData['new_status']
orgNo = companyData['orgNo']
creditNo = companyData['creditNo']
districtCode = companyData['districtCode']
actualCapi = companyData['actualCapi']
categoryNew = companyData['categoryNew']
domain = companyData['domain']
tags = companyData['tags']
tags_ = ''
for tag in tags:
tags_ += f'{tag},'
tags_ = tags_.rstrip(',')
revoke_reason = companyData['revoke_reason']
logout_reason = companyData['logout_reason']
revoke_date = companyData['revoke_date']
fenname = companyData['fenname']
sql = 'insert into organization_(CREDIT_NO,NAME,ECON_KIND,REGIST_CAPI,ID,TAGS,BELONG_ORG,STATUS,TERM_START,FORMAT_NAME,HISTORY_NAMES,REVOKE_DATE,END_DATE,REG_NO,ECON_KIND_CODE,DOMAIN,CATEGORY_NEW,ADDRESS,ORG_NO,DISTRICT_CODE,START_DATE,SCOPE,NEW_STATUS,OPER_NAME,TITLE,CHECK_DATE,ACTUAL_CAPI,TERM_END,CURRENCY_UNIT,REVOKE_REASON,TYEP_NEW,LOGOUT_REASON,FENNAME,URL,CREATE_DATE) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) '
try:
self.cursor.execute(sql, (
creditNo, name, econKind, registCapi, _id, tags_, belongOrg, status, termStart, format_name, historyNames_,
revoke_date, endDate, regNo, econKindCode, domain, categoryNew, address, orgNo, districtCode, startDate,
scope,
new_status, operName, title, checkDate, actualCapi, termEnd, currency_unit, revoke_reason, type_new,
logout_reason, fenname, url, self.now))
self.cnx.commit()
log.info(f'{name}===入库成功')
except:
log.info(f'{name}==={company}===有重复')
dfError = pd.DataFrame(errorNameList, columns=['单位名称'])
dfError.to_excel('./查询失败名单.xlsx', index=False)
if __name__ == '__main__':
toMysql = ToMysql()
toMysql.mysqlConnection()
toMysql.ORGANIZAION()
toMysql.mysqlClose()
...@@ -160,6 +160,7 @@ def RequestUrl(url, payload, social_code,start_time): ...@@ -160,6 +160,7 @@ def RequestUrl(url, payload, social_code,start_time):
# proxy = {'https': 'http://127.0.0.1:8888', 'http': 'http://127.0.0.1:8888'} # proxy = {'https': 'http://127.0.0.1:8888', 'http': 'http://127.0.0.1:8888'}
response = requests.post(url=url, headers=headers, data=payload, proxies=ip) response = requests.post(url=url, headers=headers, data=payload, proxies=ip)
# response = requests.post(url=url, data=payload)
response.encoding = response.apparent_encoding response.encoding = response.apparent_encoding
if response.status_code == 200: if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser') soup = BeautifulSoup(response.text, 'html.parser')
...@@ -372,9 +373,9 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库 ...@@ -372,9 +373,9 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
com_name = dic_info[1] com_name = dic_info[1]
try: try:
soup = RequestUrl(url, payload, social_code, start_time) soup = RequestUrl(url, payload, social_code, start_time)
except: except Exception as e:
# 请求失败,输出错误信息 # 请求失败,输出错误信息
log.error(f'请求失败:{url}') log.error(f'请求失败:{url}----{e}')
#重新放入redis #重新放入redis
baseCore.rePutIntoR('NoticeEnterprise:gnqy_socialCode_add', social_code) baseCore.rePutIntoR('NoticeEnterprise:gnqy_socialCode_add', social_code)
time.sleep(random.randint(60, 120)) time.sleep(random.randint(60, 120))
...@@ -494,16 +495,16 @@ if __name__ == '__main__': ...@@ -494,16 +495,16 @@ if __name__ == '__main__':
num = 0 num = 0
headers = { headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate', # 'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache', 'Cache-Control': 'max-age=0',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
'Content-Length': '380', # 'Content-Length': '380',
'Content-Type': 'application/x-www-form-urlencoded', 'Content-Type': 'application/x-www-form-urlencoded',
'Cookie': 'acw_tc=01c6049e16908026442931294e4d0b65d95e3ba93ac19993d151844ac6', 'Cookie': 'acw_tc=2760825217168606497214655ec9cb62ffa696c5367ec9f402d2086a0287ae; tgw_l7_route=125d8c38fe1eb06650b04b0cc6f51270',
'Host': 'eid.csrc.gov.cn', # 'Host': 'eid.csrc.gov.cn',
'Origin': 'http://eid.csrc.gov.cn', # 'Origin': 'http://eid.csrc.gov.cn',
'Pragma': 'no-cache', # 'Pragma': 'no-cache',
'Referer': 'http://eid.csrc.gov.cn/101111/index_1_f.html', 'Referer': 'http://eid.csrc.gov.cn/101111/index_1_f.html',
'Upgrade-Insecure-Requests': '1', 'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
'''
成功100 发送数据失败200 请求失败400 文章内容为空500 处理style标签失败700
'''
import datetime
import re, sys
import pymongo
import requests, time, random, json, pymysql, redis
import urllib3
from bs4 import BeautifulSoup
from obs import ObsClient
from kafka import KafkaProducer
from retry import retry
sys.path.append('D:\\zzsn_spider\\base')
import BaseCore
# todo 连接mongo
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'机器人分会']
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
# cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
# cursor = cnx.cursor()
r = baseCore.r
urllib3.disable_warnings()
rMonitor = redis.Redis(host='114.116.90.53', port=6380, password='clbzzsn', db=15)
@retry(tries=2, delay=5)
def sendMonitorKafka(dic_news):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
kafka_result = producer.send("data_lifecycle_log_data_crawler",
json.dumps(dic_news, ensure_ascii=False).encode('utf-8'))
log.info('数据监控Kafka发送成功')
@retry(tries=3, delay=5)
def sendKafka(result_dict):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
kafka_result = producer.send("crawlerInfo",
json.dumps(result_dict, ensure_ascii=False).encode('utf8'))
log.info(f"{result_dict['title']}===发送kafka成功!")
# 更新状态表
def updatewxLink(link, info_source_code, state):
updateSuccess = f"update wx_link set state= {state} where link='{link}' and info_source_code='{info_source_code}' "
cursor_.execute(updateSuccess)
cnx_.commit()
def getjsonInfo():
# todo:从redis中获取一条
linkid = baseCore.redicPullData('WeiXinGZH:linkid_yx')
if linkid:
pass
else:
log.info('-----没有数据了-----')
return False, False
# 从数据库中获取信息 一条
select_sql = f"select * from wx_link where state=0 and id= '{linkid}'"
cursor_.execute(select_sql)
row = cursor_.fetchone()
cnx_.commit()
if row:
pass
else:
log.info('-----没有数据了-----')
return False, False
dict_json = {
'sid': row[1],
'site_uri': row[2],
'site_name': row[3],
'info_source_code': row[4],
'title': row[5],
'publish_time': row[6],
'link': row[7]
}
# 拿到一条数据 更新状态
update_sql = f"update wx_link set state=1 where link='{row[7]}' and info_source_code='{row[4]}' "
cursor_.execute(update_sql)
cnx_.commit()
return dict_json, linkid
@retry(tries=20, delay=2)
def getrequest(url_news):
ip = baseCore.get_proxy()
res_news = requests.get(url_news, proxies=ip, timeout=20)
if res_news.status_code != 200:
raise
return res_news
def get_info(dict_json, linkid):
# list_all_info = []
# num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
news_title = dict_json['title']
sid = dict_json['sid']
news_date = dict_json['publish_time']
origin = dict_json['site_name']
url_news = dict_json['link']
info_source_code = dict_json['info_source_code']
# while True:
# try:
# ip = baseCore.get_proxy()
# res_news = requests.get(url_news, proxies=ip, timeout=20)
# break
# except:
# time.sleep(3)
# 400请求失败
# updatewxLink(url_news, info_source_code, 400)
# return False
# 修改请求方法,retry 3次
try:
res_news = getrequest(url_news)
# print(res_news)
except:
# 修改回原状态,重新放入redis
updatewxLink(url_news, info_source_code, 0)
log.info(f'{origin}---{news_date}--{news_title}---请求失败-- 重新放入redis')
baseCore.rePutIntoR('WeiXinGZH:linkid_yx', linkid)
# try:
# res_news = requests.get(url_news, timeout=20)
# except:
# # 400请求失败
# updatewxLink(url_news, info_source_code, 400)
return False
soup_news = BeautifulSoup(res_news.content, 'html.parser')
if '无法查看' in soup_news.text or '该页面不存在' in soup_news.text or '该内容已被发布者删除' in soup_news.text or '此内容因违规无法查看' in soup_news.text or '该公众号已迁移' in soup_news.text or '此内容被多人投诉' in soup_news.text:
log.info(f'--errorCode:800--{origin}---{news_date}---{news_title}----此内容发送失败无法查看')
updatewxLink(url_news, info_source_code, 800)
return False
try:
news_html = soup_news.find('div', {'id': 'js_content'})
news_html['style'] = 'width: 814px ; margin: 0 auto;'
# del news_html['style']
news_html = rm_style_attr(news_html)
del news_html['id']
del news_html['class']
except Exception as e:
log.info(f'--errorCode:700--{url_news}-----------{e}')
# log.error(f'{url_news}-----{info_source_code}')
updatewxLink(url_news, info_source_code, 0)
log.info(f'{origin}---{news_date}--{news_title}---style标签解析失败---重新放入redis')
baseCore.rePutIntoR('WeiXinGZH:linkid_yx', linkid)
return False
try:
news_content = news_html.text
except:
log.info(f'--------内容为空--------{url_news}--------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
news_title,
url_news,
news_html,
'文章内容为空',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
updatewxLink(url_news, info_source_code, 500)
return False
list_img = news_html.find_all('img')
for num_img in range(len(list_img)):
img_one = list_img[num_img]
url_src = img_one.get('data-src')
# print(url_src)
if url_src and 'gif' in url_src:
url_img = ''
img_one.extract()
else:
try:
try:
name_img = url_src.split('/')[-2] + '.' + url_src.split('wx_fmt=')[1]
except:
img_one.extract()
continue
try:
res = requests.get(url_src, timeout=20)
except:
img_one.extract()
continue
resp = None
for i in range(10):
try:
resp = obsClient.putContent('zzsn', name_img, content=res.content)
break
except:
time.sleep(2)
if resp:
pass
else:
img_one.extract()
continue
try:
# log.info(f'resp:{resp}')
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
except Exception as e:
log.info(f'--errorCode:300--{url_news}-----------{e}')
updatewxLink(url_news, info_source_code, 300)
return False
try:
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
except Exception as e:
log.info(f'--errorCode:300--{url_news}-----------{e}')
updatewxLink(url_news, info_source_code, 300)
return False
except Exception as e:
log.info(f'--errorCode:600--{url_news}-----------{e}')
updatewxLink(url_news, info_source_code, 600)
return False
for tag in news_html.descendants:
try:
del tag['style']
except:
pass
list_section = news_html.find_all('section')
for section in list_section:
section.name = 'div'
uniqueCode = baseCore.getUniqueCode('WX', '231')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info = {
'sid': sid,
'title': news_title,
'content': news_content,
'contentWithtag': str(news_html),
'summary': '',
'author': '',
'origin': origin,
'publishDate': news_date,
'sourceAddress': url_news,
'source': '11',
'createDate': time_now,
'uniqueCode': uniqueCode,
}
try:
#todo:数据不发送kafka,直接存入mongo
db_storage.insert_one(dic_info)
except:
log.info('------数据存储失败------')
updatewxLink(url_news, info_source_code, 200)
updatewxLink(url_news, info_source_code, 100)
return True
def rm_style_attr(soup):
# 查找所有含有style属性的标签
style_tags = soup.find_all(style=True)
# 遍历每个style标签
for style_tag in style_tags:
try:
# 使用正则表达式替换
styleattr = style_tag['style']
styleattr = re.sub(r'visibility:(?s).{1,}?;', '', styleattr)
styleattr = re.sub(r'font-family:(?s).{1,}?;', '', styleattr)
styleattr = re.sub(r'color:(?s).{1,}?;', '', styleattr)
styleattr = re.sub(r'font-size:(?s).{1,}?;', '', styleattr)
style_tag['style'] = styleattr
except:
continue
# first_div = soup.select('div[id="js_content"]')
# # 设置style属性
# first_div['style'] = 'width: 814px ; margin: 0 auto;'
first_div = soup.select('div[id="js_content"]')
if first_div:
first_div = first_div[0] # 获取第一个匹配的元素
first_div['style'] = 'width: 814px ; margin: 0 auto;' # 设置style属性
return soup
if __name__ == "__main__":
num_caiji = 0
list_all_info = []
while True:
# 一次拿取一篇文章
# todo: 从redis拿数据 更新mysql状态
dict_json, linkid = getjsonInfo()
try:
if dict_json:
if get_info(dict_json, linkid):
num_caiji = num_caiji + 1
log.info(f'-----已采集{num_caiji}篇文章---来源{dict_json["site_name"]}----')
else:
time.sleep(20)
continue
except Exception as e:
baseCore.rePutIntoR('WeiXinGZH:linkid_yx', linkid)
log.info(f'-----{linkid}--{e}失败---重新塞入redis----')
baseCore.close()
# 微信采集列表数据
# 微信采集列表数据
import json
import time
import random
import pymysql
import redis
import requests
import urllib3
from pymysql.converters import escape_string
import sys
sys.path.append('D:\\zzsn_spider\\base')
from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
s = requests.session()
cnx = pymysql.connect(host="114.116.44.11", user="caiji", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
r = baseCore.r
def resHtml(token, url, cookies):
try:
ip = baseCore.get_proxy()
s = requests.session()
cookie_jar = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
s.cookies = cookie_jar
# json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
aa = s.cookies.get_dict()
updateCookieToken(token, json.dumps(aa))
except Exception as e:
json_search = {}
return json_search
# 采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
# 获取公众号信息
def getSourceInfo(infoSourceCode):
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
cursor.execute(sql)
row = cursor.fetchone()
cnx.commit()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
error = [
origin,
url_,
info_source_code,
e,
'biz错误'
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return False
return dic_url
# 保存错误日志
def insertBadSql(error):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
# 保存文章列表数据
def insertWxList(dic_url, json_search, page, user_name):
list_all_news = json_search['app_msg_list']
listCount = 0
repetCount = 0
insertCount = 0
for one_news in list_all_news:
listCount = listCount + 1
news_title = one_news['title']
timestamp = one_news['update_time']
time_local = time.localtime(timestamp)
news_date = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
url_news = one_news['link']
selectCountSql = f"select count(1) from wx_link where link='{escape_string(url_news)}'"
cursor_.execute(selectCountSql)
count = cursor_.fetchone()[0]
if count > 0:
repetCount = repetCount + 1
continue
else:
insertCount = insertCount + 1
try:
insertSql = f"insert into wx_link(sid,site_uri,site_name,info_source_code,title,publish_time,link,state,create_time) values " \
f"('{dic_url['sid']}','{dic_url['url_']}','{dic_url['name']}','{dic_url['info_source_code']}','{escape_string(news_title)}','{escape_string(news_date)}','{escape_string(url_news)}',0,now())"
cursor_.execute(insertSql)
cnx_.commit()
except Exception as e:
log.error(f"保存数据库失败:{e}")
# 查询放入之后的id
selectIdSql = f"select id from wx_link where sid='{dic_url['sid']}' and link='{escape_string(url_news)}'"
cursor_.execute(selectIdSql)
linkid = cursor_.fetchone()[0]
# todo: 放入redis
try:
r.ping()
except:
r_ = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
r_.lpush('WeiXinGZH:url_yx', linkid)
continue
r.lpush('WeiXinGZH:linkid_yx', linkid)
log.info(f"---{dic_url['name']}--第{page}页----总数:{listCount}---重复数:{repetCount}---新增数:{insertCount}-----使用账号{user_name}--------")
# if listCount == 0:
# 列表为空认为结束
# return True
# if repetCount >= listCount / 2:
# 重复数量大于等于一半认为结束
# return True
# 没有结束
return False
# token的处理
def updateTokeen(token, type):
if type == 2:
# session失效,删除token
cursor_.execute(f"delete from weixin_tokenCookies where token={token}")
if type == 1:
# 封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set fenghao_time=now() where token={token}")
if type == 3:
# 封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set update_time=now() where token={token}")
cnx_.commit()
# token的处理
def updateCookieToken(token, cookies):
cursor_.execute(f"update weixin_tokenCookies set cookies='{escape_string(cookies)}' where token={token}")
cnx_.commit()
# 获取token
def getToken():
cursor_.execute(
f"select token, cookies, user_name from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
row = cursor_.fetchall()
cnx_.commit()
if row:
pass
else:
# 没有查到token
log.info("没有拿到token")
return False
return row[0]
# 获取列表数据
def getPageData(dic_url, page, dic_user_count):
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
biz = dic_url['biz']
fakeid = biz + '=='
tokenAndCookie = getToken()
if tokenAndCookie:
pass
else:
log.info("没有拿到token,开始递归")
while True:
log.info("没有拿到token,开始休眠")
time.sleep(60)
log.info("没有拿到token,结束休眠")
tokenAndCookie = getToken()
if tokenAndCookie:
break
user_name = tokenAndCookie[2]
token = tokenAndCookie[0]
log.info(f"获取token到----{token}----{user_name}")
cookies = json.loads(tokenAndCookie[1])
# s.cookies.update(cookies)
url = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={(page - 1) * 5}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# reponse = s.get(url, headers=headers, proxies=ip, verify=False)
# json_search = reponse.json()
# newcookies = requests.utils.dict_from_cookiejar(reponse.cookies, cookiejar=None, overwrite=True)
# s.cookies = newcookies
# updateCookieToken(token,json.dumps(s.cookies))
# 调用方法
json_search = resHtml(token, url, cookies)
str_t = json.dumps(json_search)
ret = json_search['base_resp']['ret']
if ret == 0:
if user_name in dic_user_count:
dic_user_count[user_name] += 1
else:
dic_user_count[user_name] = 1
elif ret == 200013:
log.info(f'======{origin}-----{biz}----{user_name}账号被封=======')
# 封号修改token
updateTokeen(token, 1)
return getPageData(dic_url, page, dic_user_count)
elif ret == 200002:
log.info(f'======{origin}-----{biz}----该公众号号biz错误,请检查=======')
error = [origin, url_, info_source_code, str_t, '无效biz参数']
insertBadSql(error)
return True, dic_user_count
elif ret == 200003:
log.info(f'======{origin}-----{biz}----{user_name}账号无效session=======')
# session失效修改token
updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, '无效session']
insertBadSql(error)
return getPageData(dic_url, page, dic_user_count)
elif ret == 200074:
# {"base_resp": {"ret": 200074, "err_msg": "default"}}
log.info(f'======{origin}-----{biz}----{user_name}账号未登录成功=======')
# session失效修改token
updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, f'{user_name}账号未登录成功']
insertBadSql(error)
return getPageData(dic_url, page, dic_user_count)
else:
log.info(f'======{origin}-----{biz}----{user_name}账号其他错误=======')
error = [origin, url_, info_source_code, str_t, '其他错误']
insertBadSql(error)
updateTokeen(token, 2)
return True, dic_user_count
# 修改token使用时间
updateTokeen(token, 3)
# 保存数据到数据库
return insertWxList(dic_url, json_search, page, user_name), dic_user_count
# 获取微信公众号数据
def getWxList(infoSourceCode, dic_user_count):
dic_url = getSourceInfo(infoSourceCode)
log.info(f"======{infoSourceCode}----开始采集=======")
if dic_url:
pass
else:
log.info(f'======{infoSourceCode}---------该公众号biz错误,请检查=======')
error = ['', '', infoSourceCode, '', '该公众号biz错误']
insertBadSql(error)
return
origin = dic_url['name']
biz = dic_url['biz']
for page in range(1, 118):
retFlag, dic_user_count = getPageData(dic_url, page, dic_user_count)
time.sleep(random.randint(60, 181))
if retFlag:
# 结束 跳出该公众号
break
else:
# 没有结束
pass
log.info(f"======{origin}-----{biz}----结束采集=======")
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
cnx.commit()
result_list = [item[0] for item in results]
time.sleep(2)
# 放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
# 获取redis中的公众号个数
def getnumber_redis():
length = r.llen('WeiXinGZH:infoSourceCode')
return length
if __name__ == "__main__":
# numbers = getnumber_redis()
# #if numbers>0:
# #pass
# #else:
# #getFromSql()
# #time.sleep(2)
# #numbers = getnumber_redis()
# log.info("当前批次采集公众号个数{}".format(numbers))
# time.sleep(3)
dic_user_count = {}
# start = time.time()
# log.info(f"开始时间{baseCore.getNowTime(1)}")
# while True:
# infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
# if infoSourceCode == 'None' or infoSourceCode == None:
# log.info("redis已经没有数据了,需要重新放置数据")
# log.info(f"采集完一轮公众号耗时{baseCore.getTimeCost(start, time.time())}")
# getFromSql()
# # time.sleep(60)
# # numbers = getnumber_redis()
# # log.info("当前批次采集公众号个数{}".format(numbers))
# break
# # infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
# # continue
infoSourceCode = 'IN-20240521-0002'
getWxList(infoSourceCode, dic_user_count)
# if dic_user_count:
# for key, value in dic_user_count.items():
# log.info(f"====账号{key},使用次数{value}")
#
# # infoSourceCode = 'IN-20220917-0159'
# # getWxList(infoSourceCode)
import execjs
js = execjs.compile(open(r'D:\PycharmProjects\zzsn\douyin\static\dy.js', 'r', encoding='gb18030').read())
if __name__ == '__main__':
data = 'device_platform=webapp&aid=6383&channel=channel_pc_web&publish_video_strategy_type=2&source=channel_pc_web&sec_user_id=MS4wLjABAAAADtPlZR0GJ11ox3X04rzqaBel7L441QHPVoJA8jISv9Q&pc_client_type=1&version_code=170400&version_name=17.4.0&cookie_enabled=true&screen_width=1707&screen_height=1067&browser_language=zh-CN&browser_platform=Win32&browser_name=Edge&browser_version=117.0.2045.47&browser_online=true&engine_name=Blink&engine_version=117.0.0.0&os_name=Windows&os_version=10&cpu_core_num=20&device_memory=8&platform=PC&downlink=10&effective_type=4g&round_trip_time=50&webid=7372020108170167871&msToken=DrQxAShB824nYAbtbpl31BDIxRN4WyGfdyjHWGxJbqozTVhJcuCU8kxxT7HUZttUFJjzft1NqmFXpe0-GW59wC9eRxS6CS24x2YTDIkSIAoqzWbzyLP46cwmh0iHQTuo&'
xs = js.call('get_dy_xb',data)
print(xs)
\ No newline at end of file
# 获取详情页 # 获取详情页
# 获取详情页 # 获取详情页
import time import time
import redis
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
r = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
def getDetailmsg( detailmsg):
try:
detailurl = detailmsg['detailUrl']
title = detailmsg['title']
content, contentWithTag = self.extractorMsg(detailurl, title)
contentWithTag = self.rmTagattr(contentWithTag, detailurl)
except Exception as e:
content = ''
contentWithTag = ''
currentdate = self.getNowDate()
kword = self.searchkw
publishDate = detailmsg['publishTag']
publishDate = publishDate + ''
# publishtime=self.paserTime(publishtime)
# publishDate=publishtime.strftime("%Y-%m-%d %H:%M:%S")
detailmsg = {
'title': detailmsg['title'],
'source': detailmsg['sourceTag'],
'detailurl': detailurl,
'content': content,
'contentHtml': contentWithTag,
'publishtime': publishDate,
'currentdate': currentdate,
'kword': kword
}
return detailmsg
def getProcessitem(bdetail):
nowDate = self.getNowDate()
content = bdetail['content']
if content != '':
processitem = {
"sid": self.sid,
"source": "4",
"title": bdetail['title'],
"content": bdetail['content'],
"contentWithtag": bdetail['contentHtml'],
"origin": bdetail['source'],
"publishDate": bdetail['publishtime'],
"sourceAddress": bdetail['detailurl'],
"createDate": nowDate
}
return processitem
# 将列表数据插入到表中 baidu_search_result
def itemInsertToTable(items):
itemdata = []
conx, cursorM = connMysql()
companyinfo = item
social_code = str(companyinfo.split('|')[0])
ch_name = companyinfo.split('|')[1]
en_name = companyinfo.split('|')[2]
rank = bangdan_name + '|' + str(companyinfo.split('|')[3])
for item in items:
nowtime = getNowDate()
data = (social_code, en_name, ch_name, rank, item['title'], item['content'], item['detailurl'], item['publishtime'], item['source'], nowtime)
itemdata.append(data)
sql = "INSERT into Company_layoff_copy1 (企业信用代码,企业英文名称,企业中文名称,所在榜单排名,标题,内容,链接,发布时间,来源,创建时间) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata)
logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
closeSql(conx, cursorM)
def get_detail_html(): def get_detail_html():
......
...@@ -451,22 +451,164 @@ def aaaaa(final_output): ...@@ -451,22 +451,164 @@ def aaaaa(final_output):
finall_list.append(result) finall_list.append(result)
print(finall_list) print(finall_list)
if __name__ == '__main__': def paserTime(publishtime):
# same_list = ['让我们从一次时光旅行', '开启植物天堂的故事', '地球的午夜', '是在火山喷发中度过的', '到了凌晨三四点', '在海洋深处有了生命的迹象', '清晨6点多', '更加壮丽的生命乐章开始了', '更加壮丽的生命乐草开始了', '更加壮丽的生命乐章开始了', '更加壮丽的生命乐草开始了', '更加壮丽的生命乐章开始了', '种蓝藻细菌', '一种蓝藻细菌', '学会利用二氧化碳水和阳光', '制造生命所需能量', '同时释放出了氧气', '这个被称为光合作用的过程', '为植物世界打开了大门', '此时', '中国的陆地', '也逐渐从海洋露出形成岛屿', '但在相当长的时间里', '陆地十分荒凉没有生机', '这些岩石坚硬', '无法储存水分', '是当时陆地环境的写照', '直到晚上九点多', '也就是四亿年前左右', '些矮小的生命', '开始征服陆地', '她们用一种近似于根的构造', '固定在岩石上', '苔藓', '是陆地最早的拓荒者之', '小', '她们死后的身体', '形成了肥沃的土壤', '让更多的植物可以在这里生存', '从此', '绿色成为植物天堂的底色'] timeType = ['年前', '月前', '周前', '前天', '昨天', '天前', '今天', '小时前', '分钟前']
current_datetime = datetime.datetime.now()
publishtime = publishtime.strip()
print(publishtime)
try:
if '年前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day = int(numbers[0])
delta = datetime.timedelta(days=365 * day)
publishtime = current_datetime - delta
elif '月前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day = int(numbers[0])
delta = datetime.timedelta(days=30 * day)
publishtime = current_datetime - delta
elif '周前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day = int(numbers[0])
delta = datetime.timedelta(weeks=day)
publishtime = current_datetime - delta
elif '天前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day = int(numbers[0])
delta = datetime.timedelta(days=day)
publishtime = current_datetime - delta
elif '前天' in publishtime:
delta = datetime.timedelta(days=2)
publishtime = current_datetime - delta
elif '昨天' in publishtime:
current_datetime = datetime.datetime.now()
delta = datetime.timedelta(days=1)
publishtime = current_datetime - delta
elif '今天' in publishtime or '小时前' in publishtime or '分钟前' in publishtime:
delta = datetime.timedelta(hours=5)
publishtime = current_datetime - delta
elif '年' in publishtime and '月' in publishtime:
time_format = '%Y年%m月%d日'
publishtime = datetime.datetime.strptime(publishtime, time_format)
elif '月' in publishtime and '日' in publishtime:
current_year = current_datetime.year
time_format = '%Y年%m月%d日'
publishtime = str(current_year) + '年' + publishtime
publishtime = datetime.datetime.strptime(publishtime, time_format)
except Exception as e:
print('时间解析异常!!')
return publishtime
# aaa = aaaaa(same_list) if __name__ == '__main__':
# #
# for i in range(len(same_list)): # # aaa = aaaaa(same_list)
# print(i, same_list[i])
# #
isHandleSuccess, handleMsg = True, "success" # #
for i in range(3): # # for i in range(len(same_list)):
if i <= 3: # # print(i, same_list[i])
HandleSuccess, handleMsg = True, "success" # #
else: # # isHandleSuccess, handleMsg = True, "success"
HandleSuccess, handleMsg = False, "error" # # for i in range(3):
print(i, HandleSuccess, handleMsg) # # if i <= 3:
# # HandleSuccess, handleMsg = True, "success"
# # else:
# # HandleSuccess, handleMsg = False, "error"
# # print(i, HandleSuccess, handleMsg)
# import re
# import time
#
# import pandas as pd
# import pymongo
# import redis
#
# r = redis.StrictRedis(host='114.115.221.202', port=6379, db=1, decode_responses=True, password='clbzzsn')
# db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
# '裁员数据']
# # typeList = ['2023年福布斯','2022年福布斯','独角兽','世界500','欧盟']
# # for type in typeList:
# # par = re.compile(type)
# # datas = db_storage.find({'flg': False, '内容-翻译': ''})
# # for data in datas:
# # r.rpush('translation:downsiz', str(data['_id']))
# # print(data)
# dataList = []
# par = re.compile('独角兽')
# datas = db_storage.find({'flg':True,'所在榜单排名':par})
# for data in datas:
# del data['_id']
# del data['flg']
# dataList.append(data)
# pd.DataFrame(dataList).to_excel('./独角兽.xlsx')
# from base import BaseCore
# basecore = BaseCore.BaseCore()
# header = {
# 'Accept': 'application/json, text/plain, */*',
# 'Accept-Language': 'zh-CN,zh;q=0.9',
# 'Connection': 'keep-alive',
# 'Content-Type': 'application/json',
# 'Sec-Fetch-Dest': 'empty',
# 'Sec-Fetch-Mode': 'cors',
# 'Sec-Fetch-Site': 'same-site',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
# 'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYzNjcxMTc0NiIsImlhdCI6MTcxNDk1Njg3MywiZXhwIjoxNzE3NTQ4ODczfQ.qMEvtETT7RS3Rhwq9idu5H2AKMxc2cjtr5bDDW6C6yOFKR-ErgDwT4SOBX9PB2LWDexAG2hNaeAvn6swr-n6VA',
# 'X-TYCID': 'dad485900fcc11ee8c0de34479b5b939',
# 'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
# 'sec-ch-ua-mobile': '?0',
# 'sec-ch-ua-platform': '"Windows"',
# 'version': 'TYC-Web'
# }
# header = {
# # 'Accept': 'application/json, text/plain, */*',
# 'Accept-Language': 'zh-CN,zh;q=0.9',
# 'Connection': 'keep-alive',
# 'Content-Type': 'application/json',
# # 'Sec-Fetch-Dest': 'empty',
# # 'Sec-Fetch-Mode': 'cors',
# # 'Sec-Fetch-Site': 'same-site',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
# 'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYzNjcxMTc0NiIsImlhdCI6MTcxNDk1Njg3MywiZXhwIjoxNzE3NTQ4ODczfQ.qMEvtETT7RS3Rhwq9idu5H2AKMxc2cjtr5bDDW6C6yOFKR-ErgDwT4SOBX9PB2LWDexAG2hNaeAvn6swr-n6VA',
# 'X-TYCID': 'dad485900fcc11ee8c0de34479b5b939',
# # 'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
# # 'sec-ch-ua-mobile': '?0',
# # 'sec-ch-ua-platform': '"Windows"',
# 'version': 'TYC-Web'
# }
# ip = basecore.get_proxy()
# url = 'https://capi.tianyancha.com/cloud-listed-company/listed/holder/hk?&date=&gid=2348871426&sortField=&sortType=-100&pageSize=10&pageNum=1&percentLevel=-100&keyword='
# # url = 'https://capi.tianyancha.com/cloud-listed-company/listed/holder/topTen?_=1716458307394&type=1&gid=4845825&sortField=&sortType=-100&pageSize=10&pageNum=1'
# # url = 'https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holderV2?_=1716534254189'
# # payload = {"gid":"2350084808","pageSize":10,"pageNum":1,"sortField":"","sortType":"-100","historyType":1,"percentLevel":"-100","keyword":""}
# # req = requests.post(url=url, headers=header, data=json.dumps(payload), proxies=ip)
# req = requests.get(url=url, headers=header, proxies=ip)
# print(req.json())
# req.close()
# headers = {
# 'Accept':
# 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
# 'Accept-Encoding': 'gzip, deflate',
# 'Accept-Language': 'zh-CN,zh;q=0.9',
# 'Cache-Control': 'max-age=0',
# 'Connection': 'keep-alive',
# 'Content-Length': '367',
# 'Content-Type': 'application/x-www-form-urlencoded',
# 'Cookie':
# 'acw_tc=2760825217168606497214655ec9cb62ffa696c5367ec9f402d2086a0287ae; tgw_l7_route=125d8c38fe1eb06650b04b0cc6f51270',
# 'Host': 'eid.csrc.gov.cn',
# 'Origin': 'http://eid.csrc.gov.cn',
# 'Referer': 'http://eid.csrc.gov.cn/101812/index_f.html',
# 'Upgrade-Insecure-Requests': '1',
# 'User-Agent':
# 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36'
# }
# payload = {'prodType': '002598', 'prodType2': '代码/简称/拼音缩写 ', 'keyWord': '', 'keyWord2': '关键字', 'startDate': '',
# 'startDate2': '请输入开始时间', 'endDate': '', 'endDate2': '请输入结束时间', 'selCatagory2': '10057', 'selBoardCode0': '',
# 'selBoardCode': ''}
# req = requests.get(url='http://eid.csrc.gov.cn/101812/index_2_f.html', headers=headers, data=payload)
# print(req.status_code)
publish_time = '2023年10月5日 '
aaa = paserTime(publish_time)
print(aaa)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论