提交 fcfdc422 作者: 刘伟刚

海关代码提交11

上级 237f5376
[redis]
;host=127.0.0.1
host=192.168.1.234
port=6379
pass=
[mysql]
host=114.115.159.144
username=caiji
password=zzsn9988
database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
[selenium]
chrome_driver=D:\chrome\chromedriver.exe
binary_location=D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe
[param]
path='D:\\haiguan\\codets'
year=2023,2022
endMonth=7,6,5,4,3,2,1
import configparser
import csv
import glob
import os
import re
import pandas as pd
def combinFiles(inFileName,outFileName):
# 查找当前目录及其子目录下所有以.txt结尾的文件
# files = glob.glob(r"D:\hg\2023\08\累计\贸易伙伴\202301-08--贸易伙伴--进口-307.csv", recursive=True)
files = glob.glob(inFileName, recursive=True)
# 创建一个空的DataFrame用于存储合并后的数据
merged_data = pd.DataFrame()
# 逐个读取CSV文件并合并到DataFrame中
for file in files:
try:
print('------'+file)
data = pd.read_csv(file, encoding='gbk',dtype=str, keep_default_na=False)
# 尝试读取数据
# df = pd.read_csv('filename.csv', error_bad_lines=False)
except Exception as e2:
print(e2)
# 去掉最后一列
data = data.iloc[:, :-1]
dad=pd.DataFrame(data,dtype=str)
# merged_data = merged_data.append(dad, ignore_index=True)
merged_data =pd.concat([merged_data, dad], ignore_index=True)
# 将合并后的数据保存到新的CSV文件中
merged_data.to_csv(outFileName, encoding='gbk', index=False, quoting=1, quotechar='"', escapechar='\\')
print(f'文件名称:{outFileName}')
print('合并完成!!')
return outFileName
def fileclearn(csvFile,outxlsFile,recordName,iEType_name):
# file=r'D:\hg\2023\202307\202307--收发货地址商品--进口.csv'
df = pd.read_csv(csvFile, encoding='gbk',dtype=str)
# df = pd.read_csv(csvFile, encoding='gbk',dtype=str)
# print(data.iloc[1:5])
# 去掉最后一列
# df = data.drop(data.columns[-1], axis=1)
# 将商品编码的列转换为字符串类型
df['商品编码'] = df['商品编码'].astype(str)
# 在数字长度小于8的前面补0
df['商品编码'] = df['商品编码'].str.zfill(8)
df['商品名称'] =df['商品名称'].str.replace('\r', '')
# 添加新列并放在最前面
df.insert(0, '进出口标识', iEType_name)
df.insert(0, '报告期', recordName)
# 找到美元列的索引
usd_column_index = df.columns.get_loc('美元')
# 删除美元列及其后面的列
df = df.iloc[:, :usd_column_index+1]
# 去除逗号并将文本数据转换为数值型
df['美元'] =df['美元'].str.replace(',', '').astype(float)
df['第一数量'] = pd.to_numeric(df['第一数量'].str.replace(',', '').replace('-', ''), errors='coerce')
# 将NaN值替换为0
df['第一数量'] = df['第一数量'].fillna(0)
df['第二数量'] =pd.to_numeric(df['第二数量'].str.replace(',', '').replace('-', ''), errors='coerce')
df['第二数量'] = df['第二数量'].fillna(0)
# 将处理后的DataFrame保存为xlsx文件
#df.to_excel(outxlsFile, encoding='gbk', index=False, engine='openpyxl')
df.to_excel(outxlsFile, index=False, engine='openpyxl')
print('数据处理完成!')
return outxlsFile
"""
文件合并
文件路径
D:\hg\2023\07\单月\收发货地址\*--进口.csv
D:\hg\2023\07\累计\收发货地址\
参数 year endmonth 字段类型 进出口类型
"""
def getFileName(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-*.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-*.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
filename = os.path.join(filepath, filename)
return filename
def getRecordFileName(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str
return filename
def getOutFileName(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'商品--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'商品--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\'
filename = os.path.join(filepath, filename)
return filename
def getOutFileNameXls(year,startMonth,endMonth,field_name,iEType_name):
path='D:\\hg\\'
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'商品--'+iEType_name+'.xlsx'
filepath=path+str(year)+'\\'+end_str+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'商品--'+iEType_name+'.xlsx'
filepath=path+str(year)+'\\'+end_str+'\\'
filename = os.path.join(filepath, filename)
return filename
#读取配置并调用方法处理文件
def readConfig():
config = configparser.ConfigParser()
config.read('config.ini')
years=config.get('param', 'year')
endMonths=config.get('param', 'endMonth')
field_names=['贸易伙伴']
# field_names=['收发货地址','贸易方式']
iEType_names=['进口','进出口','出口']
for yy in years.split(','):
year=int(yy)
for emm in endMonths.split(','):
endMonth=int(emm)
startMonths=[1]
if endMonth>1:
startMonths.append(endMonth)
for smm in startMonths:
startMonth=int(smm)
for field_name in field_names:
for iEType_name in iEType_names:
infileName=getFileName(year,startMonth,endMonth,field_name,iEType_name)
outFileName=getOutFileName(year,startMonth,endMonth,field_name,iEType_name)
outfileNameXls=getOutFileNameXls(year,startMonth,endMonth,field_name,iEType_name)
print(infileName)
print(outFileName)
# 合并文件
outfileNamecsv=combinFiles(infileName,outFileName)
#转换清洗保存成excel
recordName=getRecordFileName(year,startMonth,endMonth,field_name,iEType_name)
outfileNameXls=fileclearn(outfileNamecsv,outfileNameXls,recordName,iEType_name)
print(outfileNameXls)
if __name__ == '__main__':
readConfig()
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import configparser
import redis
import concurrent.futures
from pyquery import PyQuery as pq
class hgCookie(object):
def __init__(self):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
self.bin_path=self.config.get('selenium', 'binary_location')
self.driver_path=self.config.get('selenium', 'chrome_driver')
def get_webdriver(self):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument('--headless')
chrome_options.binary_location = self.bin_path
executable_path =self.driver_path
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
return driver
def reqGetCookie(self):
url='http://stats.customs.gov.cn/queryData/queryDataByWhere'
driver=self.get_webdriver()
driver.get(url)
# 等待页面加载完成
wait = WebDriverWait(driver, 10) # 设置最长等待时间为10秒
wait.until(EC.presence_of_element_located((By.ID, "doSearch"))) # 等待元素出现在页面中
# 获取页面加载的所有cookie信息
cookies = driver.get_cookies()
# 构建cookie字符串
cookie_str = '; '.join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
# 打印cookie字符串
self.r.sadd('hgcookie',cookie_str)
driver.quit()
def runSpider(self,i):
print(i)
self.reqGetCookie()
def getnewMonth(self):
url='http://stats.customs.gov.cn/queryData/queryDataByWhere'
driver=self.get_webdriver()
driver.get(url)
# 等待页面加载完成
wait = WebDriverWait(driver, 10) # 设置最长等待时间为10秒
wait.until(EC.presence_of_element_located((By.ID, "doSearch"))) # 等待元素出现在页面中
html=driver.page_source
doc=pq(html)
endMonth=doc('select[id="endMonth"]>option[selected="selected"]').text()
print(f'海关页面的月份{endMonth}')
self.r.set('newMonth',endMonth)
driver.quit()
if __name__ == '__main__':
hgCookie=hgCookie()
hgCookie.getnewMonth()
while True:
size=hgCookie.r.scard('hgcookie')
print(f'海关的cookie数量:{size}')
if size>100:
time.sleep(60)
kwList=[]
for i in range(1, 101):
kwList.append(i)
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(hgCookie.runSpider, data) for data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
result = future.result()
# 处理任务的执行结果
print(f"任务执行结束: {result}")
except Exception as e:
# 处理任务执行过程中的异常
print(f"任务执行exception: {e}")
\ No newline at end of file
import os
import redis
from flask import Flask, request, send_file, render_template, jsonify
import json
import pymysql
from pyquery import PyQuery as pq
from flask_cors import cross_origin
'''
手动捕获请求的接口数据,实现解析
使用fiddler将链接对应的页面数据信息发送到后台,后台对数据进行解析
'''
r = redis.Redis(host='127.0.0.1', port='6379', db=0)
def connMysql():
# 创建MySQL连接
conx = pymysql.connect(host='114.115.159.144',
user='caiji',
password='zzsn9988',
database='caiji')
# 创建一个游标对象
cursorM = conx.cursor()
return conx,cursorM
def closeSql(conx,cursorM):
# 关闭游标和连接
cursorM.close()
conx.close()
#将列表数据插入到表中 baidu_search_result
def itemInsertToTable(item):
conx,cursorM=connMysql()
zKeyNo=item['zKeyNo']
yKeyNo=item['yKeyNo']
try:
select_sql=f'select * from qccholdmsg where yKeyNo="{yKeyNo}" and zKeyNo="{zKeyNo}" '
cursorM.execute(select_sql)
existing_record = cursorM.fetchone()
except Exception as e:
existing_record=''
if existing_record:
print(f'数据已存在!{zKeyNo}')
return
insert_param=(item['yKeyNo'],item['yCompanyName'],item['nameCount'],item['zKeyNo'],item['zName'],
item['registCapi'],item['province'],item['industry'],item['shortStatus'],item['percentTotal'],item['startDateStr'],
item['h5Url'],item['district'],item['industryDesc'],item['area'],item['industryItem'])
insert_sql ="INSERT into qccholdmsg (yKeyNo,yCompanyName,nameCount,zKeyNo,zName,registCapi,province," \
"industry,shortStatus,percentTotal,startDateStr,h5Url,district,industryDesc,area,industryItem) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.execute(insert_sql,insert_param)
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
print('数据插入成功!')
closeSql(conx,cursorM)
app = Flask(__name__)
@app.route('/')
@cross_origin()
def index():
return 'Welcome to the website!'
@app.route('/get_hold', methods=['POST'])
@cross_origin()
def get_news():
data=request.form
@app.route('/task/setCookie', methods=['GET'])
# @cross_origin()
def setCookie():
try:
cookie = request.args.get('cookie')
r.sadd('hgcookie',cookie)
except Exception as e:
print('error')
return 'succes'
@app.route('/task/getCookieSize', methods=['GET'])
@cross_origin()
def getCookieSize():
try:
size=r.scard('hgcookie')
data = {
"code": 200,
"msg": "操作成功",
"data": size
}
except Exception as e:
data={
"code": 200,
"msg": "操作失败",
"data": 0
}
return jsonify(data)
@app.route('/task/getHtml', methods=['POST'])
# @cross_origin()
def getnewMonth():
try:
html = request.form.get('html')
doc=pq(html)
endMonth=doc('select[id="endMonth"]>option[selected="selected"]').text()
print(f'海关页面的月份{endMonth}')
r.set('newMonth',endMonth)
except Exception as e:
print('error')
return 'success'
if __name__ == '__main__':
app.run(port=8002)
import os
import sys
import logbook
import logbook.more
class LogRecord(object):
# 日志格式
def logFormate(self,record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间
level=record.level_name, # 日志等级
filename=os.path.split(record.filename)[-1], # 文件名
func_name=record.func_name, # 函数名
lineno=record.lineno, # 行号
msg=record.message # 日志内容
)
return formate
# 获取logger
def getLogger(self,fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log"
if not os.path.exists(dirname):
os.mkdir(dirname)
logbook.set_datetime_format('local')
logger = logbook.Logger(filename)
logger.handlers = []
if fileLogFlag: # 日志输出到文件
logFile = logbook.TimedRotatingFileHandler(os.path.join(dirname, filename), date_format='%Y-%m-%d',
bubble=True, encoding='utf-8')
logFile.formatter = self.logFormate
logger.handlers.append(logFile)
if stdOutFlag: # 日志打印到屏幕
logStd = logbook.more.ColorizedStderrHandler(bubble=True)
logStd.formatter = self.logFormate
logger.handlers.append(logStd)
return logger
\ No newline at end of file
[param]
#页面数据大小
pageSize: 10
#进出口类型 1 进口 0 出口 10 进出口
iEType: 1,0,10
#币种 rmb usd
currencyType: usd
#年份
year: 2023
#开始月份
startMonth: 7
#结束月份
endMonth: 7
#
monthFlag:
unitFlag: true
unitFlag1: true
#编码长度
codeLength: 8
#商品编码参数
outerField1: CODE_TS
outerField2:
outerField3:
outerField4:
#商品编码的code
outerValue1:
outerValue2:
outerValue3:
outerValue4:
orderType: CODE ASC DEFAULT
# 数据格式 目前202201年之后的数据下载时设置为:1,2022年之前的数据设置为2 数据格式分界点 202111 之前是2
selectTableState: 1
currentStartTime: 202201
This source diff could not be displayed because it is too large. You can view the blob instead.
import csv
import pandas as pd
# filePath=r'D:\hg\2023\08\累计\贸易伙伴\202301-08--贸易伙伴--进口-307.csv'
# # codes=[]
# with open(filePath) as csvfile:
#
# reader = csv.reader(csvfile)
# #跳过第一条数据
# # next(reader)
# try:
# for row in reader:
# # 进行数据处理
# print(row)
# except csv.Error as e:
# # 打印错误信息
# print(f'Error reading CSV file: {e}')
#
# filePathName='test.csv'
# # 打开输入文件和输出文件
# with open(filePath, 'r') as input_csv, open(filePathName, 'w', newline='') as output_csv:
# # 创建CSV读取器和写入器
# csv_reader = csv.reader(input_csv)
# csv_writer = csv.writer(output_csv)
# # 逐行读取输入文件,并将每一行写入输出文件
# for row in csv_reader:
# csv_writer.writerow(row)
def group_elements(codes):
groups = [codes[i:i+8] for i in range(0, len(codes), 8)]
result = [','.join(group) for group in groups]
return result
cc=['12','13','12','15','1','2','3','4','6','8','9']
aa=group_elements(codes=cc)
print(aa)
1.海关采集说明
海关的信息采集主要是下载对象的数据信息
难点是如何获取到cookie并保存,因为海关网站的cookie只能使用一次后就会失效
海关数据的下载,海关的网站每次只能下载1W条信息,因此数据的下载需要根据条件对code参数进行拆分
2.数据的保存,由于数据中会存在换行,格式不对等问题
1.cookie保存方法
模拟浏览器请求海关网站首页获取到cookie 并将cookie保存到redis中,使用一条删除一条
2.下载接口
下载参数
outerField1: CODE_TS #商品编码
outerField2: ORIGIN_COUNTRY #贸易伙伴
outerField3: TRADE_MODE #贸易方式
outerField4: TRADE_CO_PORT #收发货地址
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论