提交 c2749092 作者: 刘伟刚

天眼查测试代码

上级 f7814e8e
# -*- coding: utf-8 -*-
# 智能采集请求
# 1、考虑:请求智能采集时,不再使用实体类
# a. 仍使用:通过HTTP的 raw 请求体,直接传递HTML源文件,通过query参数传递 lang-code、link-text 参数
# b. 原因:在 postman 中,不方便进行测试,无法使用粘贴后的HTML源文件
# 2、不考虑:使用实体类,利大于弊
# a. 使用实体类,方便扩展参数字段
# b. 方便展示接口文档:调用 json_parameter_utility.get_json_parameters 函数,可显示请求实体类
class ExtractionRequest:
# 语言代码
# 1、采集“非中文”的文章时,需要用到语言代码
lang_code = ""
# 链接文本
# 1、用于采集标题,如果不提供,标题的准确度会下降
link_text = ""
# 文章页面源文件
# 1、用于采集标题、发布时间、内容等
article_html = ""
@staticmethod
def from_dict(dictionary: dict):
extraction_request = ExtractionRequest()
# 尝试方法:
# 1、将字典,更新到内部的 __dict__ 对象
# extraction_request.__dict__.update(dictionary)
# 将字典值,设置到当前对象
for key in dictionary:
setattr(extraction_request, key, dictionary[key])
return extraction_request
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
# 采集结果
class ExtractionResult:
# 标题
title = ""
# 发布日期
publish_date = ""
# 正文(保留所有HTML标记,如:br、img)
text = ""
# URL
url = ""
# 摘要
meta_description = ""
# 干净正文(不带HTML)
cleaned_text = ""
# 来源(目前只支持采集中文网站中的“来源”)
# source = ""
# 顶部图片(top_image:采集不到任何内容,不再使用此属性)
# top_image = ""
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
class UrlPickingRequest:
# 列表页面的响应URL
# 1、作为Base URL,用于拼接提取到的相对URL
# 2、Base URL:必须使用响应URL
# 3、示例:在 Python中,通过 requests.get(url) 请求URL后,需要使用 resp.url 作为 Base URL
list_page_resp_url = ""
# 列表页面源文件
# 1、用于提取文章网址
list_page_html = ""
@staticmethod
def from_dict(dictionary: dict):
url_picking_request = UrlPickingRequest()
# 将字典值,设置到当前对象
for key in dictionary:
setattr(url_picking_request, key, dictionary[key])
return url_picking_request
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
# 根据信用代码获取天眼查id
import json
import random
import time
import pymysql
import requests
from base.BaseCore import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore()
log = baseCore.getLogger()
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Content-Length': '32',
'Content-Type': 'application/json',
'Host': 'capi.tianyancha.com',
'Origin': 'https://www.tianyancha.com',
'Referer': 'https://www.tianyancha.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxODcwMzc1MjYwMCIsImlhdCI6MTY5NzE5MDMwMywiZXhwIjoxNjk5NzgyMzAzfQ.awXuS-59RzK35r0aUJq4Rj83JzyAOvsdUfL_ojp66CVQMjlLv_ZDD9g5gCoZKE21LN1JYRMLNZhuWsHhxapROw',
'X-TYCID': '6f6298905d3011ee96146793e725899d',
'sec-ch-ua': '"Google Chrome";v="117", "Not;A=Brand";v="8", "Chromium";v="117"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'version': 'TYC-Web'
}
# headers = {
# 'X-TYCID':'30c1289042f511ee9182cd1e1bcaa517',
# # 'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzU5MjQ4MTgzOSIsImlhdCI6MTY5MjkzMzIxMiwiZXhwIjoxNjk1NTI1MjEyfQ.BKxDem8fpgeDHrIgm3qCoF76ueHtQSG1DggiTl4FAaoNKt4gem6NTX1XYndPXqVj9TXfl-8yp2kKE3jY66dyig',
# 'version':'TYC-Web',
# 'Content-Type':'application/json;charset=UTF-8'
# }
# cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# cursor= cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
taskType = '天眼查企业id/天眼查'
#根据信用代码获取天眼查id 企业名字等信息
def getTycIdByXYDM(xydm):
retData={'state':False,'tycData':None,'reput':True}
url=f"https://capi.tianyancha.com/cloud-tempest/search/suggest/v3?_={baseCore.getNowTime(3)}"
ip = baseCore.get_proxy()
paramJsonData = {'keyword':xydm}
try:
# headers['User-Agent'] = baseCore.getRandomUserAgent()
# headers['X-AUTH-TOKEN'] = baseCore.GetTYCToken()
response = requests.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip)
time.sleep(random.randint(3, 5))
retJsonData =json.loads(response.content.decode('utf-8'))
if retJsonData['data'] and retJsonData['state']== 'ok':
pass
else:
log.error(f"---{xydm}-未查询到该企业---")
retData['reput'] = False
return retData
matchType=retJsonData['data'][0]['matchType']
if matchType=='信用代码匹配':
retData['state'] = True
retData['tycData'] = retJsonData['data'][0]
response.close()
return retData
else:
log.error(f"{xydm}------{retJsonData}")
response.close()
return retData
except:
log.error(f"---{xydm}--天眼查token失效---")
return retData
# 更新天眼查企业基本信息
def updateTycInfo():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
# social_code = '9111000066990444XF'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
continue
start = time.time()
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
# 数据重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
xydm = data[2]
tycid = data[11]
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(xydm)
if retData['tycData'] and retData['reput']:
tycid = retData['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
elif not retData['tycData'] and retData['reput']:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
elif not retData['reput'] and not retData['tycData']:
continue
except Exception as e:
log.error(e)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
if __name__ == '__main__':
updateTycInfo()
\ No newline at end of file
import os
import redis
from flask import Flask, request, send_file, render_template, jsonify
import json
import pymysql
from pyquery import PyQuery as pq
from flask_cors import cross_origin
'''
手动捕获请求的接口数据,实现解析
使用fiddler将链接对应的页面数据信息发送到后台,后台对数据进行解析
'''
r = redis.Redis(host='127.0.0.1', port='6379', db=0)
def connMysql():
# 创建MySQL连接
conx = pymysql.connect(host='114.115.159.144',
user='caiji',
password='zzsn9988',
database='caiji')
# 创建一个游标对象
cursorM = conx.cursor()
return conx,cursorM
def closeSql(conx,cursorM):
# 关闭游标和连接
cursorM.close()
conx.close()
#将列表数据插入到表中 baidu_search_result
def itemInsertToTable(item):
conx,cursorM=connMysql()
zKeyNo=item['zKeyNo']
yKeyNo=item['yKeyNo']
try:
select_sql=f'select * from qccholdmsg where yKeyNo="{yKeyNo}" and zKeyNo="{zKeyNo}" '
cursorM.execute(select_sql)
existing_record = cursorM.fetchone()
except Exception as e:
existing_record=''
if existing_record:
print(f'数据已存在!{zKeyNo}')
return
insert_param=(item['yKeyNo'],item['yCompanyName'],item['nameCount'],item['zKeyNo'],item['zName'],
item['registCapi'],item['province'],item['industry'],item['shortStatus'],item['percentTotal'],item['startDateStr'],
item['h5Url'],item['district'],item['industryDesc'],item['area'],item['industryItem'])
insert_sql ="INSERT into qccholdmsg (yKeyNo,yCompanyName,nameCount,zKeyNo,zName,registCapi,province," \
"industry,shortStatus,percentTotal,startDateStr,h5Url,district,industryDesc,area,industryItem) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.execute(insert_sql,insert_param)
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
print('数据插入成功!')
closeSql(conx,cursorM)
app = Flask(__name__)
@app.route('/')
@cross_origin()
def index():
return 'Welcome to the website!'
@app.route('/get_hold', methods=['POST'])
@cross_origin()
def get_news():
data=request.form
@app.route('/ws/setCookie', methods=['GET'])
# @cross_origin()
def setCookie():
try:
cookie = request.args.get('cookie')
r.sadd('wscookie',cookie)
except Exception as e:
print('error')
return 'succes'
@app.route('/ws/getCookieSize', methods=['GET'])
@cross_origin()
def getCookieSize():
try:
size=r.scard('wscookie')
data = {
"code": 200,
"msg": "操作成功",
"data": size
}
except Exception as e:
data={
"code": 200,
"msg": "操作失败",
"data": 0
}
return jsonify(data)
@app.route('/ws/getHtml', methods=['POST'])
# @cross_origin()
def getnewMonth():
try:
html = request.form.get('html')
doc=pq(html)
wsmsg=doc('select[id="endMonth"]>option[selected="selected"]').text()
r.set('wsmsg',wsmsg)
except Exception as e:
print('error')
return 'success'
if __name__ == '__main__':
app.run(port=8033)
function r(size){
function r(size){
var str = "",
arr = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'];
for(var i=0; i<size; i++){
str += arr[Math.round(Math.random() * (arr.length-1))];
}
return str;
}
function strTobinary(str) {
var result = [];
var list = str.split("");
for (var i = 0; i < list.length; i++) {
if (i != 0) {
result.push(" ");
}
var item = list[i];
var binaryStr = item.charCodeAt().toString(2);
result.push(binaryStr);
};
return result.join("");
}
function cipher() {
var date = new Date();
var timestamp = date.getTime().toString();
var salt = r(24);
var year = date.getFullYear().toString();
var month = (date.getMonth() + 1 < 10 ? "0" + (date.getMonth() + 1) : date
.getMonth()).toString();
var day = (date.getDate() < 10 ? "0" + date.getDate() : date.getDate())
.toString();
var iv = year + month + day;
return salt
}
function des(salt,iv,enc) {
// var enc = des3(timestamp, salt, iv).toString();
var str = salt + iv + enc;
var ciphertext = strTobinary(str);
return ciphertext;
}
function token(){
var size = 24
var str = "",
arr = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'];
for(var i=0; i<size; i++){
str += arr[Math.round(Math.random() * (arr.length-1))];
}
return str;
}
function pageid() {
var n = 32
var text = "";
var possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
for (var i = 0; i < n; i++)
text += possible.charAt(Math.floor(Math.random() * possible.length));
return text;
}
// console.log(cipher());
\ No newline at end of file
import time
import time
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from BaseCore import BaseCore
baseCore=BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
agent = baseCore.getRandomUserAgent()
def flushAndGetToken():
browser.refresh()
wait = WebDriverWait(browser, 10)
wait.until(EC.presence_of_element_located((By.ID, "loginLi")))
cookie_list = browser.get_cookies()
cookies = ''
for cookie in cookie_list:
cookies=cookies+cookie['name']+"="+cookie['value']+";"
info = browser.page_source
# res_2 = requests.get(year_url, proxies=ip)
soup = BeautifulSoup(info, 'html.parser')
#欢迎您,13683816984
aEle = soup.find('li', attrs={'id':'loginLi'})
text = aEle.text
user_name = text.replace("欢迎您,","")
#user_name = soup.select('a[id="loginLi"]')[0].text.replace("欢迎您,","")
#user_name = soup.find('a', id_='loginLi').text.replace("欢迎您,","")
if len(user_name)<3:
log.info("没有登录成功,请重试")
return
cursor_.execute("insert into cpwsw_user (user,cookie,create_time,update_time,fenghao_time) values (%s,%s,now(),now(),DATE_SUB(NOW(), INTERVAL 1 DAY))",[user_name,cookies])
cnx_.commit()
log.info("保存成功")
opt = webdriver.ChromeOptions()
#opt.add_argument('user-agent=' + baseCore.getRandomUserAgent())
#opt.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
#opt.binary_location = r'C:/Program Files/Google/Chrome/Application/chrome.exe'
opt.binary_location = r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
chromedriver =r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
service = Service(chromedriver)
browser = webdriver.Chrome(options=opt, service=service)
url = "https://wenshu.court.gov.cn/"
browser.get(url)
# 手机登录
#等200秒后在获取cookie保存到数据库
time.sleep(60)
flushAndGetToken()
cursor_.close()
cnx_.close()
baseCore.close()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论