提交 39bc90e5 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

import json
import re
import time
from itertools import groupby
from operator import itemgetter
import pymysql
import redis
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages import urllib3
from retry import retry
from base import BaseCore
urllib3.disable_warnings()
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4')
cursor = cnx.cursor()
URL = 'https://www.nasdaq.com/'
session = requests.session()
session.mount('https://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
session.mount('http://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
}
taskType = '财务数据/纳斯达克'
# 判断股票代码是否存在
@retry(tries=3, delay=1)
def check_code(com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.exists('caiwu_nasdaq_code::' + com_code)
# 如果key存在 则不是第一次采集该企业, res = 1
if res:
return False # 表示不是第一次采集
else:
return True # 表示是第一次采集
# 判断采集日期是否存在
@retry(tries=3, delay=1)
def check_date(com_code, info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('caiwu_nasdaq_code::' + com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
@retry(tries=3, delay=1)
def add_date(com_code, date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
# 遍历date_list 放入redis
for date in date_list:
res = r.sadd('caiwu_nasdaq_code::' + com_code, date)
# 数据发送端口
def sendData(start_time, social_code, gpdm, dic_info):
data = json.dumps(dic_info)
url_baocun = 'http://114.115.236.206:8088/sync/finance/nsdk'
for nnn in range(0, 3):
try:
res_baocun = requests.post(url_baocun, data=data)
log.info(f'{social_code}=={gpdm}财务数据保存接口成功')
break
except:
log.error(f'{social_code}=={gpdm}财务数据保存接口失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{social_code}===财务数据保存接口失败')
time.sleep(1)
# 获取单位
def getUnit(gpdm):
url = f'https://www.nasdaq.com/market-activity/stocks/{gpdm}/financials'
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml')
unit = soup.find('div', class_='financials__note').text.split(' ')[1].lstrip().strip()
unit = f'(千){unit}'
req.close()
return unit
# 获取财务数据列表
def getlist(table, tableName):
list = []
years = table['headers']
datas = table['rows']
for data in datas:
for i in range(2, len(data) + 1):
name = data['value1']
value = data[f'value{i}']
if any(char.isdigit() for char in value):
value = re.sub(r"[^\d+-]", "", value)
else:
value = '-'
date = years[f'value{i}'].split('/')[2] + '-' + years[f'value{i}'].split('/')[0] + '-' + \
years[f'value{i}'].split('/')[1]
list.append({f'{tableName}': name, 'value': value, 'date': date, })
return list
# 财务数据按年份整合
def combanBydate(balance_list):
listbydates = []
balance_list.sort(key=itemgetter('date'))
groups = groupby(balance_list, key=itemgetter('date'))
for date, group in groups:
# 迭代表达式 一个日期的表
listbydate = [item for item in group]
listbydates.append([date, listbydate])
return listbydates
# 构造规范财务数据列表
def reviseData(lists, unit, tableName):
list_r = []
for data in lists:
list = {
'enName': data[f'{tableName}'],
'value': data['value'],
'unit': unit
}
list_r.append(list)
return list_r
# 获取年度财务数据
def getYear(start_time, session, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=1'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data = req.json()['data']
if data:
unit = getUnit(gpdm)
all_list = []
lrb_list = getlist(data['incomeStatementTable'], 'lrb')
zcfz_list = getlist(data['balanceSheetTable'], 'zcfz')
xjll_list = getlist(data['cashFlowTable'], 'xjll')
for list in lrb_list:
all_list.append(list)
for list in zcfz_list:
all_list.append(list)
for list in xjll_list:
all_list.append(list)
all_group = combanBydate(all_list)
date_list = []
for date, final_list in all_group:
# 判断该报告期是否已采过
panduan = check_date(social_code, date + '-year')
if panduan:
continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
lrb_list_f = reviseData([item for item in final_list if 'lrb' in item], unit, 'lrb')
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": gpdm,
"date": date,
"debt": zcfz_list_f,
"profit": lrb_list_f,
"cash": xjll_list_f,
'dateFlag': 'year',
"ynFirst": ynFirst,
}
sendData(start_time, social_code, gpdm, dic_info)
date_list.append(date + '-year')
else:
log.error(f'找不到{social_code}=={gpdm}年度财务数据')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败')
return date_list
# 获取季度财务数据 需要判断日期是否取与年度数据日期重合,重合需要修改类型为dateFlag字段为year
def getQuarter(start_time, session, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=2'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data = req.json()['data']
if data:
unit = getUnit(gpdm)
all_list = []
lrb_list = getlist(data['incomeStatementTable'], 'lrb')
zcfz_list = getlist(data['balanceSheetTable'], 'zcfz')
xjll_list = getlist(data['cashFlowTable'], 'xjll')
for list in lrb_list:
all_list.append(list)
for list in zcfz_list:
all_list.append(list)
for list in xjll_list:
all_list.append(list)
all_group = combanBydate(all_list)
for date, final_list in all_group:
# 判断该报告期是否已采过
panduan = check_date(social_code, date + '-quarter')
if panduan:
continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
lrb_list_f = reviseData([item for item in final_list if 'lrb' in item], unit, 'lrb')
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": gpdm,
"date": date,
"debt": zcfz_list_f,
"profit": lrb_list_f,
"cash": xjll_list_f,
'dateFlag': 'quarter',
"ynFirst": ynFirst,
}
# 判断季度数据年份是否与年度数据年份相投
panduan_flag = check_date(social_code, date + '-year')
if panduan_flag:
dic_info['dateFlag'] = 'year'
sendData(start_time, social_code, gpdm, dic_info)
date_list.append(date + '-quarter')
else:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败')
return date_list
def doJob():
# while True:
# social_code = baseCore.redicPullData('')
# datas_enterprise = baseCore.getInfomation(social_code)
session.get(URL, headers=headers)
# sql = "select * from mgzqyjwyh_list where state=2 and exchange='Nasdaq';"
# cursor.execute(sql)
# datas_enterprise = cursor.fetchall()
# for data_enterprise in datas_enterprise:
start_time = time.time()
# gpdm = data_enterprise[3]
# social_code = data_enterprise[6]
social_code = 'ZD0CN0012309000172'
gpdm = 'NTES'
# 采集年度数据
date_list_year = getYear(start_time, session, social_code, gpdm)
# 保存年度数据到redis
add_date(social_code, date_list_year)
# 采集季度数据
date_list_quarter = getQuarter(start_time, session, social_code, gpdm)
# 保存季度数据到redis
add_date(social_code, date_list_quarter)
timeCost = baseCore.getTimeCost(start_time, time.time())
state = 1
baseCore.recordLog(social_code, taskType, state, timeCost, '', '')
log.info(f'{social_code}=={gpdm}==耗时{timeCost}')
# break
cursor.close()
cnx.close()
if __name__ == '__main__':
doJob()
import configparser
import configparser
import re
import time
from urllib.parse import quote, unquote
import pymysql
import redis
import requests
import json
from pyquery import PyQuery as pq
from bs4 import BeautifulSoup
import difflib
import urllib3
from seleniumwire import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from base.BaseCore import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from operator import itemgetter
from itertools import groupby
import datetime
class SinaUsstock(object):
def __init__(self):
baseCore=BaseCore()
self.logger=baseCore.getLogger()
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=6)
self.driver=self.get_webdriver()
def conn11(self):
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='caiji', passwd='f7s0&7qqtK', db='clb_project',
charset='utf8')
cursor = conn.cursor()
return conn,cursor
def deal_table(self,doc_resp):
soup=BeautifulSoup(doc_resp,'html.parser')
tdoc=soup.select('div[class="tbl_wrap"]>table[class="data_tbl os_tbl"]')[1]
tbody=pq(str(tdoc))
uint=tbody.find('tbody>tr:nth-child(1)>th').text().split(':')[1]
pdate=tbody.find('tbody>tr:nth-child(1)>td').text().replace('至','').split(' ')
btds=tbody.find('tbody>tr:gt(0)')
seriesList=[]
for btd in btds:
tddoc=pq(btd)
seriesName=tddoc.find('th').text().replace('+','').replace('-','')
seriesValue=tddoc.find('td').text().split(' ')
for i in range(0,len(pdate)):
value=seriesValue[i]
if '亿' in value:
value = value.replace("亿", "*100000000")
value = eval(value)
elif '万' in value:
value = value.replace("万", "*10000")
value = eval(value)
vvla=str(value)
serisemsg={
'name':seriesName,
'value':vvla,
'ddte':pdate[i],
'uint':uint,
}
seriesList.append(serisemsg)
return seriesList
def getCodeFromRedis(self):
securitiescode=self.r.lpop('sina_usstock:securities_code')
securitiescode = securitiescode.decode('utf-8')
return securitiescode
def get_webdriver(self):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument('--headless')
chrome_options.binary_location = self.config.get('selenium', 'binary_location')
executable_path =self.config.get('selenium', 'chrome_driver')
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
return driver
# 使用股票代码拼接地址访问新浪财经地址,通过定位现金浏览,资产负债,利润信息
#若执行中出现异常则将股票代码放回redis中,
def get_content2(self,securitiescode):
self.logger.info(f"需要采集的股票代码{securitiescode}")
conn,cursor=self.conn11()
try:
sql1 = f"select social_credit_code,securities_code,securities_short_name from sys_base_enterprise_ipo where securities_code='{securitiescode}' " # and stock_code = "SYNH"
cursor.execute(sql1)
result_data = cursor.fetchall()
except Exception as e:
self.logger.info("数据查询异常!")
return
for data in result_data:
try:
data_list = list(data)
social_credit_code = data_list[0]
stock = data_list[1]
stock2=str(stock)
url=f'http://quotes.sina.com.cn/usstock/hq/balance.php?s={stock2}'
self.driver.get(url)
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
#1.现金 cash 年度 季度
#点击现金标签加载页面 点击年度 解析数据 现金流量表
try:
self.driver.find_element(By.XPATH,'//div[@class="title"]//a[text()="现金流量表"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="年度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList1=self.deal_table(doc_resp1)
zbl1=self.groupdata(seriesList1,'cash')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击现金标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="季度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList2=self.deal_table(doc_resp1)
zbl2=self.groupdata(seriesList2,'cash')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#2.资产 debt 年度 季度
#点击资产标签加载页面 点击年度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="title"]//a[text()="资产负债表"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="年度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList3=self.deal_table(doc_resp1)
zbl3=self.groupdata(seriesList3,'debt')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击现金标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="季度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList4=self.deal_table(doc_resp1)
zbl4=self.groupdata(seriesList4,'debt')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击利资产标签加载页面 点击季度 解析数据
#3.利润 profit 年度 季度
#点击利润标签加载页面 点击年度 解析数据
#点击利润标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="title"]//a[text()="利润表"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="年度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList5=self.deal_table(doc_resp1)
zbl5=self.groupdata(seriesList5,'profit')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#点击现金标签加载页面 点击季度 解析数据
try:
self.driver.find_element(By.XPATH,'//div[@class="tbl_wrap"]/div[@align="right"]/a[text()="季度数据"]').click()
# 等待页面加载完成
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "grey")))
doc_resp1=self.driver.page_source
seriesList6=self.deal_table(doc_resp1)
zbl6=self.groupdata(seriesList6,'profit')
except Exception as e:
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
#转换数据格式发送接口
annualzb=zbl1+zbl3+zbl5
annualzb=self.groupZbData(annualzb,stock,social_credit_code,'annual')
self.sendToFinance(annualzb)
quarterzb=zbl2+zbl4+zbl6
quarterzb=self.groupZbData(quarterzb,stock,social_credit_code,'quarter')
self.sendToFinance(quarterzb)
self.logger.info(f'++++++++++股票:{stock}采集结束')
except Exception as e:
self.driver.quit()
time.sleep(3)
self.driver=self.get_webdriver()
self.logger.info(e)
self.r.rpush('sina_usstock:securities_code',securitiescode)
return
def sendToFinance(self,zbmsg):
for zbb in zbmsg:
if len(zbb) != 0:
# 调凯歌接口存储数据
data = json.dumps(zbb)
#暂无接口
url_baocun = ''
# url_baocun = 'http://114.115.236.206:8088/sync/finance/df'
for nnn in range(0, 3):
try:
res_baocun = requests.post(url_baocun, data=data)
self.logger.info(res_baocun.text)
break
except:
time.sleep(1)
#zbList,stock,social_credit_code
def groupZbData(self,zbList,stock,social_credit_code,dateFlag):
self.logger.info('数据根据日期进行组合')
# 根据 date对数据分组
# 根据时间属性对列表内容进行分类
zbList.sort(key=itemgetter('date')) # 先按照age属性进行排序
zbgroups = groupby(zbList, key=itemgetter('date')) # 根据age属性进行分组
# 遍历每个分组,并打印分类结果
zbList=[]
for date, group in zbgroups:
result={}
for item in group:
for key, value in item.items():
if key == "date":
continue
if key not in result:
result[key] = []
result[key].extend(value)
result["date"] = date
result["securitiesCode"] = stock
result["socialCreditCode"] = social_credit_code
result["dateFlag"] = dateFlag
result["ynFirst"] = False
# "securitiesCode": "2342",
# "socialCreditCode": "12314",
# "date": "2023-06-31",
# "dateFlag": "quarter",
# "ynFirst": false
zbList.append(result)
return zbList
#表数据和对应财务指标
def groupdata(self,ssMsg,zbtype):
self.logger.info('对数据进行指标数据进分组')
# 根据时间属性对列表内容进行分类
ssMsg.sort(key=itemgetter('ddte')) # 先按照age属性进行排序
groups = groupby(ssMsg, key=itemgetter('ddte')) # 根据age属性进行分组
# 遍历每个分组,并打印分类结果
zbList=[]
for ddte, group in groups:
# print(f"ddte: {ddte}")
ssList=[]
for item in group:
ii={
"enName": "",
"name": item['name'],
"value": item['value'],
"uint": item['uint']
}
ssList.append(ii)
ddrit={
"date":ddte,
zbtype:ssList
}
zbList.append(ddrit)
return zbList
def getFormatedate(self,timestamp):
date = datetime.datetime.fromtimestamp(timestamp)
formatted_date = date.strftime('%Y-%m-%d')
return formatted_date
if __name__ == '__main__':
sinaUsstock=SinaUsstock()
# securitiescode= sinaUsstock.r.lpop('sina_usstock:securities_code')
securitiescode= sinaUsstock.getCodeFromRedis()
securitiescode='AAPL'
try:
sinaUsstock.get_content2(securitiescode)
except Exception as e:
sinaUsstock.r.rpush('sina_usstock:securities_code',securitiescode)
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
'''
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
data=data
proxy={}
# response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
# time.sleep(5)
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201',
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year>2021:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'CODE_TS',
'outerField3':'',
'outerField4':'',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9995:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(row[0])
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
if endMonth==1:
startMonths=[1]
else:
startMonths=[1,endMonth]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载贸易方式商品,贸易伙伴商品,注册地商品 的统计信息
#1.从单个统计文件中获取对应的贸易编码,
#2.对每个贸易编码进行文件下载
#3.对下载的文件进行合并清洗重命名
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
for code in codes:
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileList.append(filePathName)
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
return codeFileList
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
if '商品' in fieldFile:
continue
if '贸易伙伴' in fieldFile:
outerField1=['ORIGIN_COUNTRY']
if '贸易方式' in fieldFile:
outerField1=['TRADE_MODE']
if '收发货地址' in fieldFile:
outerField1=['TRADE_CO_PORT']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile)
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
print(f'codes:{len(codeFileList)}')
print(len(codeFileList))
while len(codes)!= len(codeFileList):
print('+++++++++++++')
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
if __name__ == '__main__':
hgDownFile=HgDownFile()
# hgDownFile.fileNameleiji()
# hgDownFile.fileNamedanyue()
# hgDownFile.tmpToFile(tmpfilename,filePathName)
ss=hgDownFile.config.get('param', 'endMonth')
yss=hgDownFile.config.get('param', 'year')
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
fieldFileList=hgDownFile.field1Down(year,endMonth)
if endMonth==1:
while len(fieldFileList)< 12:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12:
break
else:
while len(fieldFileList)< 24:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 24:
break
for i in range(1,3):
print('_______________')
hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
'''
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
data=data
proxy={}
# response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
# time.sleep(5)
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
if year==2022 and endMonth==1:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201',
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
if year==2022 and endMonth==1:
selectTableState= 1 #202201前的数据为2 后的数据是1
else:
selectTableState= 2 #202201前的数据为2 后的数据是1
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'CODE_TS',
'outerField3':'',
'outerField4':'',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9995:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(row[0])
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
startMonths=[endMonth]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载贸易方式商品,贸易伙伴商品,注册地商品 的统计信息
#1.从单个统计文件中获取对应的贸易编码,
#2.对每个贸易编码进行文件下载
#3.对下载的文件进行合并清洗重命名
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
for code in codes:
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileList.append(filePathName)
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
return codeFileList
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
if '商品' in fieldFile:
continue
if '贸易伙伴' in fieldFile:
outerField1=['ORIGIN_COUNTRY']
if '贸易方式' in fieldFile:
outerField1=['TRADE_MODE']
if '收发货地址' in fieldFile:
outerField1=['TRADE_CO_PORT']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile)
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
print(f'codes:{len(codeFileList)}')
print(len(codeFileList))
while len(codes)!= len(codeFileList):
print('+++++++++++++')
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
if __name__ == '__main__':
hgDownFile=HgDownFile()
# hgDownFile.fileNameleiji()
# hgDownFile.fileNamedanyue()
# hgDownFile.tmpToFile(tmpfilename,filePathName)
ss=hgDownFile.config.get('param', 'endMonth')
yss=hgDownFile.config.get('param', 'year')
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
fieldFileList=hgDownFile.field1Down(year,endMonth)
if endMonth==1:
while len(fieldFileList)< 12:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12:
break
for i in range(1,3):
print('_______________')
hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
import configparser
import csv
import glob
import os
import shutil
import time
import pandas as pd
import redis
import requests
from datetime import datetime
'''
海关下载数据类型和参数分类组合
CODE_TS #商品编码 ORIGIN_COUNTRY #贸易伙伴 TRADE_MODE #贸易方式 TRADE_CO_PORT #收发货地址
1.设置进出口类型 (默认进口,出口,进出口都进行下载)采用遍历的方式
2.设置查询起止时间 默认最新一个月的单月数据,和累计的数据下载
3.设置币种 默认是usd
4.查询字段分组 1.商品详情 四个都设置
5.单个统计数据下载 下载单个分组的数据
6.排序方式,使用默认的编码排序
7.下载文件路径设置和命名规则
d:/hg/2023/7/
数据默认存储位置 D://hg
其它路径从参数中读取
一级 年份
二级月份
三级月份类型单月,累计
四级 币种
五级 字段分组
六级 文件名
3、采集单个字段的统计数据
4.临时文件
1)将请求下载的文件放到临时目录中,
2)对临时的目录文件进行数据的过滤修改重命名保存到对应目录下
3)将临时文件删除
4)根据文件名和列表记录做对比,来下载缺失的文件
5.数据下载分类
1)按照类型分组获取对应的每月的最新编码信息
2)根据字段编码和商品进行对应统计信息的下载
3)根据商品编码下载数据
'''
class HgDownFile(object):
def __init__(self):
self.downUrl="http://stats.customs.gov.cn/queryData/downloadQueryData"
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getcookie(self):
cookie=self.r.spop('hgcookie')
# cookie=self.r.srandmember('hgcookie')
while cookie is None:
time.sleep(10)
cookie=self.r.srandmember('hgcookie')
if cookie is not None:
break
cookie=cookie.decode('utf-8')
cookie=cookie.strip('"')
return cookie
#请求下载文件
def reqDownFile(self,data):
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
data=data
proxy={}
# response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
statuscode=410
filename='数据文件.csv'
while statuscode != 200:
# time.sleep(5)
try:
header={
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding':'gzip, deflate',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control':'max-age=0',
'Content-Type':'application/x-www-form-urlencoded',
'Host':'stats.customs.gov.cn',
'Origin':'http://stats.customs.gov.cn',
'Proxy-Connection':'keep-alive',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Cookie': self.getcookie()
}
response=requests.post(url=self.downUrl,data=data,headers=header,verify=False,timeout=20)
# response.encoding = response.apparent_encoding
response.encoding = 'GB2312'
statuscode=response.status_code
if statuscode == 200:
try:
csv_content = response.text
count = csv_content.count("\n")
csv_content=csv_content.replace('\0', '')
print(count)
# filename='数据文件.csv'
tmppath='D:\\hg\\tmp'
# save_dir = os.path.dirname(tmppath)
os.makedirs(tmppath, exist_ok=True)
filename = os.path.join(tmppath, filename)
with open(filename, 'w') as file:
file.write(csv_content)
print('CSV文件下载保存成功!')
break
except Exception as e:
print(e)
statuscode=411
else:
print('CSV文件下载保存失败!')
except Exception as e:
print(data)
print(e)
statuscode=412
continue
print(f'statuscode:{statuscode}')
return filename
#统计数据的文件路径设置单个字段
def filepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
try:
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'
except Exception as e:
print(e)
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
#统计数据的文件路径设置单个字段
def codeFilepath(self,iEType,currencyType,year,startMonth,endMonth,outerField1,code):
path='D:\\hg\\'
field_name=self.getfieldName(outerField1)
iEType_name=self.getiETypeName(iEType)
if startMonth<endMonth:
start_str = '01'
end_str = "{:02d}".format(endMonth)
filename=str(year)+start_str+'-'+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\累计\\'+field_name+'\\'
else:
end_str = "{:02d}".format(endMonth)
filename=str(year)+end_str+'--'+field_name+'--'+iEType_name+'-'+str(code)+'.csv'
filepath=path+str(year)+'\\'+end_str+'\\单月\\'+field_name+'\\'
save_dir = os.path.dirname(filepath)
os.makedirs(save_dir, exist_ok=True)
filename = os.path.join(save_dir, filename)
return filename
def getfieldName(self,outerField1):
field_name=''
if 'CODE_TS' in outerField1:
#商品信息
field_name='商品'
elif 'ORIGIN_COUNTRY' in outerField1:
#国家
field_name='贸易伙伴'
elif 'TRADE_MODE' in outerField1:
#
field_name='贸易方式'
elif 'TRADE_CO_PORT' in outerField1:
#国内省份
field_name='收发货地址'
return field_name
def getiETypeName(self,iEType):
iETypeName=''
if 0==iEType:
iETypeName='出口'
elif 1==iEType:
iETypeName='进口'
elif 10==iEType:
iETypeName='进出口'
return iETypeName
#单个字段的参数设置
def setparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1):
# if year>2021:
# selectTableState= 1 #202201前的数据为2 后的数据是1
# else:
# selectTableState= 2 #202201前的数据为2 后的数据是1
selectTableState= 3
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'',
'outerField3':'',
'outerField4':'',
'outerValue1':'',
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState, #202201前的数据为2 后的数据是1
'currentStartTime': '202201',
}
return param
#联合查询字段的参数设置
def setcodesAndProductparam(self,iEType,currencyType,year,startMonth,endMonth,outerField1,filedCode):
# if year>2021:
# selectTableState= 1 #202201前的数据为2 后的数据是1
# else:
# selectTableState= 2 #202201前的数据为2 后的数据是1
selectTableState= 3
param={
'pageSize': 10,
'iEType': iEType,
'currencyType': currencyType,
'year': year,
'startMonth': startMonth,
'endMonth': endMonth,
'monthFlag':'',
'unitFlag': False,
'unitFlag1': False,
'codeLength': '8',
'outerField1': outerField1,
'outerField2':'CODE_TS',
'outerField3':'',
'outerField4':'',
'outerValue1': filedCode,
'outerValue2':'',
'outerValue3':'',
'outerValue4':'',
'orderType': 'CODE ASC DEFAULT',
'selectTableState': selectTableState,
'currentStartTime': '202201',
}
return param
#将临时文件放复制到目录中
def tmpToFile(self,tmpfilename,filePathName):
# 打开csv文件
with open(tmpfilename, 'r') as file:
# 创建csv阅读器
csv_reader = csv.reader(file)
# 使用len()函数获取行数
line_count = len(list(csv_reader))
if line_count > 9995:
print('csv文件行数过大需要对编码进行拆分')
os.remove(tmpfilename)
return ''
else:
shutil.copy(tmpfilename, filePathName)
os.remove(tmpfilename)
return filePathName
def readcsv(self,filePath):
codes=[]
with open(filePath, newline='') as csvfile:
reader = csv.reader(csvfile)
#跳过第一条数据
next(reader)
for row in reader:
# print(row[0])
codes.append(row[0])
return codes
#下载获取字段的编码信息
def field1Down(self,year,endMonth):
fieldFileList=[]
current_date = datetime.now()
# year = current_date.year
# year = int(self.config.get('param', 'year'))
year = int(year)
month = current_date.month
iETypes=[0,1,10]
outerFields=['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
# outerFields=['CODE_TS']
currencyType='usd'
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
# if endMonth != (month-1):
# return
startMonths=[1]
for startMonth in startMonths:
for iEType in iETypes:
for outerField1 in outerFields:
param=self.setparam(iEType,currencyType,year,startMonth,endMonth,outerField1)
filePathName=self.filepath(iEType,currencyType,year,startMonth,endMonth,outerField1)
fieldFileList.append(filePathName)
if os.path.exists(filePathName):
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
return fieldFileList
#下载贸易方式商品,贸易伙伴商品,注册地商品 的统计信息
#1.从单个统计文件中获取对应的贸易编码,
#2.对每个贸易编码进行文件下载
#3.对下载的文件进行合并清洗重命名
def fieldCodeDown(self,iEType,currencyType,year,startMonth,endMonth,outerField1,codes):
codeFileList=[]
for code in codes:
param=self.setcodesAndProductparam(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
filePathName=self.codeFilepath(iEType,currencyType,year,startMonth,endMonth,outerField1,code)
if os.path.exists(filePathName):
print(f'文件已存在{filePathName}')
codeFileList.append(filePathName)
continue
tmpfilename=self.reqDownFile(param)
saveFileName=self.tmpToFile(tmpfilename,filePathName)
print(saveFileName)
codeFileList.append(saveFileName)
return codeFileList
def codeFieldDown(self,fieldFileList,year,endMonth):
current_date = datetime.now()
# year = current_date.year
year = int(year)
# endMonth=self.r.get('newMonth')
# endMonth=int(endMonth.decode('utf-8'))
# endMonth=int(self.config.get('param', 'endMonth'))
for fieldFile in fieldFileList:
#['CODE_TS','ORIGIN_COUNTRY','TRADE_MODE','TRADE_CO_PORT']
try:
if '商品' in fieldFile:
continue
if '贸易伙伴' in fieldFile:
outerField1=['ORIGIN_COUNTRY']
if '贸易方式' in fieldFile:
outerField1=['TRADE_MODE']
if '收发货地址' in fieldFile:
outerField1=['TRADE_CO_PORT']
if '单月' in fieldFile:
startMonth=endMonth
if '累计' in fieldFile:
startMonth=1
if '--进口' in fieldFile:
iEType=1
if '--出口' in fieldFile:
iEType=0
if '--进出口' in fieldFile:
iEType=10
currencyType='usd'
codes=hgDownFile.readcsv(fieldFile)
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
print(f'codes:{len(codeFileList)}')
print(len(codeFileList))
while len(codes)!= len(codeFileList):
print('+++++++++++++')
codeFileList=hgDownFile.fieldCodeDown(iEType,currencyType,year,startMonth,endMonth,outerField1,codes)
except Exception as e:
print(e)
if __name__ == '__main__':
hgDownFile=HgDownFile()
# hgDownFile.fileNameleiji()
# hgDownFile.fileNamedanyue()
# hgDownFile.tmpToFile(tmpfilename,filePathName)
ss=hgDownFile.config.get('param', 'endMonth')
yss=hgDownFile.config.get('param', 'year')
for ye in yss.split(','):
year=int(ye)
for s in ss.split(','):
endMonth=int(s)
fieldFileList=hgDownFile.field1Down(year,endMonth)
if endMonth==1:
while len(fieldFileList)< 12:
fieldFileList=hgDownFile.field1Down(year,endMonth)
if len(fieldFileList)>= 12:
break
for i in range(1,3):
print('_______________')
hgDownFile.codeFieldDown(fieldFileList,year,endMonth)
...@@ -17,9 +17,11 @@ outerField3: TRADE_MODE #贸易方式 ...@@ -17,9 +17,11 @@ outerField3: TRADE_MODE #贸易方式
outerField4: TRADE_CO_PORT #收发货地址 outerField4: TRADE_CO_PORT #收发货地址
海关网站的数据分类
1、近期数据库 2022年1月之后的数据
2、老数据库 2022年1月之前的数据
3、跨库的数据 2022年的累计数据
......
...@@ -7,10 +7,11 @@ import pymysql ...@@ -7,10 +7,11 @@ import pymysql
import requests import requests
import urllib3 import urllib3
from pymysql.converters import escape_string from pymysql.converters import escape_string
import sys
from base.BaseCore import BaseCore sys.path.append('D:\\zzsn_spider\\base')
import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore() baseCore =BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
headers = { headers = {
...@@ -146,6 +147,7 @@ def getToken(): ...@@ -146,6 +147,7 @@ def getToken():
pass pass
else: else:
#没有查到token #没有查到token
log.info("没有拿到token")
return False return False
return row[0] return row[0]
...@@ -161,8 +163,11 @@ def getPageData(dic_url,page): ...@@ -161,8 +163,11 @@ def getPageData(dic_url,page):
if tokenAndCookie: if tokenAndCookie:
pass pass
else: else:
log.info("没有拿到token,开始递归")
while True: while True:
log.info("没有拿到token,开始休眠")
time.sleep(60) time.sleep(60)
log.info("没有拿到token,结束休眠")
tokenAndCookie = getToken() tokenAndCookie = getToken()
if tokenAndCookie: if tokenAndCookie:
break break
...@@ -262,8 +267,7 @@ if __name__=="__main__": ...@@ -262,8 +267,7 @@ if __name__=="__main__":
log.info("redis已经没有数据了,重新放置数据") log.info("redis已经没有数据了,重新放置数据")
getFromSql() getFromSql()
time.sleep(60) time.sleep(60)
continue infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
# infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
getWxList(infoSourceCode) getWxList(infoSourceCode)
# infoSourceCode = 'IN-20220917-0159' # infoSourceCode = 'IN-20220917-0159'
......
import json
from operator import itemgetter
from itertools import groupby
files=r'D:\hg\3.json'
# 读取JSON文件
with open(files, 'r') as file:
data = json.load(file)
data=data['facts']['us-gaap']
zb=[]
# 遍历字典的key值
for key in data.keys():
accounts=data[key]['units']
for kk in accounts.keys():
accounts=accounts[kk]
for account in accounts:
end=str(account['end'])
val=str(account['val'])
fp=str(account['fp'])
form=str(account['form'])
zhibiao={
'zbname':key,
'riqi':end,
'jine':val,
'fp':fp,
'form':form,
}
zb.append(zhibiao)
#找10-K的数据
# 根据age属性对列表内容进行分类
zb.sort(key=itemgetter('form')) # 先按照age属性进行排序
forms = groupby(zb, key=itemgetter('form')) # 根据age属性进行分组
form_25_data = []
for form, zz in forms:
if '10-K' in form :
form_25_data.extend(list(zz))
# 根据age属性对列表内容进行分类
form_25_data.sort(key=itemgetter('riqi')) # 先按照age属性进行排序
groups = groupby(form_25_data, key=itemgetter('riqi')) # 根据age属性进行分组
# 遍历每个分组,并打印分类结果
for riqi, group in groups:
print(f"riqi: {riqi}")
for item in group:
print(item)
print()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论