如何解决Windows上的Python多处理错误阻止代码执行
我正在尝试使用此库提取Binance历史数据。我已经在Macbook上使用了它,但是效果很好,但是在Windows上,它一直在标记多处理器错误。如何使它正常工作?
# coding=utf-8
import os
import sys
import shutil
import multiprocessing as mp
from itertools import repeat as re
from collections import namedtuple
from itertools import chain
from dateutil.tz import tzutc
import json
import csv
import datetime
import time
from dateutil.rrule import rrule,DAILY
import binance.client
class DataClient(object):
def __init__(self,**kwargs):
self.platform = sys.platform
self.pathname = os.path.dirname(sys.argv[0])
self.full_file_path = self.os_dir_suffix(os.path.abspath(self.pathname))
self.titles = ['Opened','Open','High','Low','Close','Volume']
self.fields = ['opened','open_','high','low','close_','volume']
self.csv_dates = {}
self.progress_statements = False
self.futures = kwargs.get("futures",False)
def os_dir_suffix(self,intended_dir):
if self.platform == 'win32':
dir_format = '{}\\'.format(intended_dir)
else:
dir_format = '{}/'.format(intended_dir)
return dir_format
def get_binance_pairs(self,**kwargs):
base_currencies = kwargs.get('base_currencies','')
quote_currencies= kwargs.get('quote_currencies','')
binance_pairs = list()
if not self.futures:
all_tickers = binance.client.Client(None,None).get_all_tickers()
else:
all_tickers = binance.client.Client(None,None).futures_ticker()
if base_currencies and quote_currencies:
input_pairs = [x+y for x in quote_currencies for y in base_currencies]
for x,currency_pair in enumerate(all_tickers):
if base_currencies and quote_currencies:
for pair in input_pairs:
if currency_pair['symbol'] == pair.upper():
binance_pairs.append(currency_pair['symbol'])
break
elif base_currencies:
for base_currency in base_currencies:
if currency_pair['symbol'][-len(base_currency):] == base_currency.upper():
binance_pairs.append(currency_pair['symbol'])
break
elif quote_currencies:
for quote_currency in quote_currencies:
if currency_pair['symbol'][:len(quote_currency)] == quote_currency.upper():
binance_pairs.append(currency_pair['symbol'])
break
else:
binance_pairs.append(currency_pair['symbol'])
if binance_pairs:
return binance_pairs
else:
raise ValueError('Invalid Input: Binance returned no matching currency pairs.')
def get_earliest_valid_timestamp(self,pair,interval):
client = binance.client.Client(None,None)
"""Get earliest valid open timestamp from Binance
:param symbol: Name of symbol pair e.g BNBBTC
:type symbol: str
:param interval: Binance Kline interval
:type interval: str
:return: first valid timestamp
"""
if not self.futures:
kline = client.get_klines(
symbol=pair,interval=interval,limit=1,startTime=0,endTime=int(time.time() * 1000)
)
else:
kline = client.futures_klines(
symbol=pair,endTime=int(time.time() * 1000)
)
return datetime.datetime.fromtimestamp(float(kline[0][0])/1000)
def process_csv_dates(self,interval,**kwargs):
start_date = kwargs.get('start_date','')
end_date = kwargs.get('end_date','')
first_csv_date = self.csv_dates.get('first','')
last_csv_date = self.csv_dates.get('last','')
csv_dir = self.csv_dates.get('dir','')
date_ranges=list()
earliest_timestamp = self.get_earliest_valid_timestamp(pair,interval)
if not start_date and not first_csv_date and not end_date:
end_date = datetime.datetime.utcNow()
date_ranges.append([earliest_timestamp,end_date])
elif not start_date and not first_csv_date and end_date:
if end_date<earliest_timestamp:
raise ValueError('Invalid Date Range: end date is prior to Binance open date of 07/14/2017')
date_ranges.append([earliest_timestamp,end_date])
elif start_date and not first_csv_date and end_date:
if start_date<earliest_timestamp:
start_date = earliest_timestamp
if end_date<start_date:
raise ValueError('Invalid Date Range: end date is before start date')
date_ranges.append([start_date,end_date])
elif not start_date and not end_date and first_csv_date:
end_date = datetime.datetime.utcNow()
date_ranges.append([last_csv_date,end_date])
past_csv_files = sorted(os.listdir(csv_dir))
os.remove(''.join([csv_dir,past_csv_files[-1]]))
else:
if start_date<earliest_timestamp:
start_date = earliest_timestamp
if end_date<start_date:
raise ValueError('Invalid Date Range: end date is before start date')
elif start_date<=first_csv_date and end_date>=last_csv_date:
date_ranges.append([start_date,first_csv_date-datetime.timedelta(days=1)])
date_ranges.append([last_csv_date,end_date])
past_csv_files = sorted(os.listdir(csv_dir))
os.remove(''.join([csv_dir,past_csv_files[-1]]))
elif start_date>=first_csv_date and end_date>=last_csv_date:
date_ranges.append([last_csv_date,past_csv_files[-1]]))
elif start_date<=first_csv_date and end_date<=last_csv_date:
date_ranges.append([start_date,first_csv_date-datetime.timedelta(days=1)])
else:
return None
dates = list(chain.from_iterable([[date for date in rrule(DAILY,dtstart=x[0],until=x[1]+datetime.timedelta(days=1))] for x in date_ranges]))
return dates
def process_kline_output(self,output):
output_data = {
'opened':{
'title':'Opened','var':'opened'
},'open':{
'title':'Open','var':'open_'
},'high':{
'title':'High','var':'high'
},'low':{
'title':'Low','var':'low'
},'close':{
'title':'Close','var':'close_'
},'volume':{
'title':'Volume','var':'volume'
},'closed':{
'title':'Closed','var':'closed'
},'quote_volume':{
'title':'Quote Asset Volume','var':'quote_volume'
},'total_Trades':{
'title':'Total Trades','var':'total_Trades'
},'taker_buy_base_volume':{
'title':'Taker Buy Base Asset Volume ','var':'taker_buy_base_volume'
},'taker_buy_quote_volume':{
'title':'Taker Buy Quote Asset Volume','var':'taker_buy_quote_volume'
},}
if output:
self.titles=[]
self.fields =[]
for x in output:
try:
self.titles.append(output_data.get(x,'')['title'])
self.fields.append(output_data.get(x,'')['var'])
except TypeError:
raise ValueError('Invalid Output Field: valid fields include opened,open,high,low,close,volume,closed,quote_volume,total_Trades,taker_buy_base_volume,taker_buy_quote_volume')
def create_csv_directories(self,pair_list,kline_interval,historical_price_data_directory=None):
if not historical_price_data_directory:
historical_price_data_directory = '{}historical_price_data'.format(self.full_file_path)
try:
os.makedirs(historical_price_data_directory)
except OSError:
pass
kline_interval_directory = ''.join([self.os_dir_suffix(historical_price_data_directory),'{}_data'.format(kline_interval)])
try:
os.makedirs(kline_interval_directory)
except OSError:
pass
for x,p in enumerate(pair_list):
pair_directory = ''.join([self.os_dir_suffix(kline_interval_directory),'{}'.format(str(p))])
try:
os.makedirs(pair_directory)
except OSError:
pass
individual_csvs_directory = ''.join([self.os_dir_suffix(pair_directory),'individual_csvs'])
try:
os.makedirs(individual_csvs_directory)
except OSError:
pass
return kline_interval_directory
def date_to_milliseconds(self,date):
epoch = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=tzutc())
return int((date - epoch).total_seconds() * 1000.0)
def interval_to_milliseconds(self,interval):
ms = None
seconds_per_unit = {'m': 60,'h': 60 * 60,'d': 24 * 60 * 60,'w': 7 * 24 * 60 * 60}
unit = interval[-1]
if unit in seconds_per_unit:
try:
ms = int(interval[:-1]) * seconds_per_unit[unit] * 1000
except ValueError:
pass
return ms
def get_historical_klines(self,symbol,start,end):
client = binance.client.Client(None,None)
output_data = []
limit = 1000
timeframe = self.interval_to_milliseconds(interval)
start_ts = self.date_to_milliseconds(start)
end_ts = self.date_to_milliseconds(end)
idx = 0
symbol_existed = False
while True:
try:
if not self.futures:
temp_data = client.get_klines(symbol=symbol,limit=limit,startTime=start_ts,endTime=end_ts)
else:
temp_data = client.futures_klines(symbol=symbol,endTime=end_ts)
if not symbol_existed and len(temp_data):
symbol_existed = True
if symbol_existed:
output_data += temp_data
start_ts = temp_data[len(temp_data) - 1][0] + timeframe
else:
start_ts += timeframe
idx += 1
except Exception as e:
print (str(e))
idx+=1
try:
if len(temp_data) < limit:
break
except:
break
return output_data
def past_csv_check(self,kline_interval_directory,pair):
individual_csvs_directory = ''.join([self.os_dir_suffix(kline_interval_directory),self.os_dir_suffix(pair),self.os_dir_suffix('individual_csvs')])
past_csv_files = sorted(os.listdir(individual_csvs_directory))
if past_csv_files:
self.csv_dates['first'] = datetime.datetime.strptime(past_csv_files[0].replace('.csv',''),'%Y-%m-%d')
self.csv_dates['last'] = datetime.datetime.strptime(past_csv_files[-1].replace('.csv','%Y-%m-%d')
self.csv_dates['dir'] = individual_csvs_directory
else:
self.csv_dates = {}
def concatenate_csvs(self,csv_file_info):
for x,file_info in enumerate(csv_file_info):
pair,output_path,interval = file_info
concat_csv_path = self.os_dir_suffix(os.path.join(*(output_path.rsplit(os.path.sep,2)[:-2])))
individual_csv_files = sorted([f for f in os.listdir(output_path) if f.endswith('.csv')])
old_concat_csvs = [f for f in os.listdir(concat_csv_path) if f.endswith('.csv') ]
concat_csv = '{}.csv'.format(pair)
if concat_csv in old_concat_csvs:
old_concat_csvs_path = '{}{}'.format(concat_csv_path,self.os_dir_suffix('old_concatenated_csvs'))
try:
os.makedirs(old_concat_csvs_path)
except OSError:
pass
shutil.move('{}{}'.format(concat_csv_path,concat_csv),'{}{}'.format(self.os_dir_suffix(old_concat_csvs_path),concat_csv))
if individual_csv_files:
for x,csv_file in enumerate(individual_csv_files):
outpath = '{}{}'.format(concat_csv_path,concat_csv)
fout=open(outpath,'a')
full_csv_file_path = '{}{}'.format(output_path,csv_file)
writer = csv.writer(fout,lineterminator='\n')
with open(full_csv_file_path) as f:
if x != 0:
f.__next__()
for line in f:
if len(line)>1:
writer.writerow([x.strip() for x in line.split(',')])
f.close()
fout.close()
def kline_to_csv(self,start_date,end_date,csv_file_info):
csv_dates = self.past_csv_check(kline_interval_directory,pair)
date_range = self.process_csv_dates(pair,start_date=start_date,end_date=end_date)
if not date_range:
return
output_path = ''.join([self.os_dir_suffix(kline_interval_directory),self.os_dir_suffix('individual_csvs')])
for x,date in enumerate(date_range):
if date != date_range[-1]:
if date_range[x+1]!=date+datetime.timedelta(days=1):
continue
else:
year = str(date.year)
numerical_month = str(date.month)
start = date.replace(tzinfo=tzutc())
end = date_range[x+1].replace(tzinfo=tzutc())
if self.progress_statements==True:
print ('currency pair: {} start: {} end: {}'.format(pair,end))
klines = self.get_historical_klines(pair,end)
if klines:
if int(date.day) in range(1,10):
csv_day = '0{}'.format(str(date.day))
else:
csv_day = str(date.day)
if int(date.month) in range(1,10):
csv_month ='{}-0{}-'.format(year,numerical_month)
else:
csv_month = '{}-{}-'.format(year,numerical_month)
results_csv = '{}{}{}.csv'.format(output_path,csv_month,csv_day)
with open(results_csv,'a') as f:
writer = csv.writer(f)
writer.writerow(self.titles)
if interval in ['1m','3m','5m','15m','30m']:
del klines[-1]
for y,kline in enumerate(klines):
self.open_timestamp,self.open_,self.high,self.low,self.close_,self.volume,self.close_timestamp,self.quote_volume,self.total_Trades,self.taker_buy_base_volume,self.taker_buy_quote_volume,ignore = kline
self.opened = datetime.datetime.utcfromtimestamp(float(self.open_timestamp)/1000).strftime('%Y-%m-%d %H:%M:%s')
self.closed = datetime.datetime.utcfromtimestamp(float(self.close_timestamp)/1000).strftime('%Y-%m-%d %H:%M:%s')
csv_fields = [getattr(self,field) for field in self.fields]
with open(results_csv,'a') as f:
writer = csv.writer(f)
writer.writerow(csv_fields)
file_retrevial_info = pair,interval
csv_file_info.append(file_retrevial_info)
def kline_data(self,'')
storage = kwargs.get('storage','')
output = kwargs.get('output','')
progress_statements = kwargs.get('progress_statements','')
if start_date:
start_date = datetime.datetime.strptime(start_date,'%m/%d/%Y')
if end_date:
end_date = datetime.datetime.strptime(end_date,'%m/%d/%Y')
valid_kline_intervals = ['1m','30m','1h','2h','4h','6h','8h','12h']
if interval not in set(valid_kline_intervals):
raise ValueError('Invalid Interval: Kline interval should be one of the following - {}'.format(','.join(valid_kline_intervals)))
output = self.process_kline_output(output)
if not storage:
storage = ['csv',None]
try:
storage_method,intended_dir = storage
except ValueError:
storage_method = storage[0]
intended_dir = None
if progress_statements:
self.progress_statements = progress_statements
if storage_method.lower() == 'csv':
kline_interval_directory = self.create_csv_directories(pair_list,intended_dir)
csv_file_info = mp.Manager().list()
pair = [currency_pair for i,currency_pair in enumerate(pair_list)]
lock = mp.Lock()
pool = mp.Pool(processes=mp.cpu_count(),initargs=(lock,))
# data = pool.starmap(self.kline_to_csv,zip(pair,re(start_date),re(end_date),re(kline_interval_directory),re(interval),re(titles),re(fields),re(csv_file_info)))
data = pool.starmap(self.kline_to_csv,re(csv_file_info)))
pool.close()
pool.join()
self.concatenate_csvs(set(list(csv_file_info)))
else:
raise ValueError('Invalid Storage Type: Currently only csv storage supported')
这是我要根据密码进行的通话
from binance_data.client import DataClient
pair_list = DataClient().get_binance_pairs(base_currencies=['USDT'],quote_currencies=['ETH','BTC','BNB','HOT'])
store_data = DataClient().kline_data(pair_list,'1m',start_date='08/18/2020',end_date='08/25/2019',storage=['csv','C:\\Users\\Documents'],progress_statements=True)
我遇到此错误
C:\Users\GEGE-01\Documents\Arbitrage\Binance\binance-data>python binance-his-data.py
Traceback (most recent call last):
File "<string>",line 1,in <module>
File "C:\python3\lib\multiprocessing\spawn.py",line 116,in spawn_main
exitcode = _main(fd,parent_sentinel)
File "C:\python3\lib\multiprocessing\spawn.py",line 125,in _main
prepare(preparation_data)
File "C:\python3\lib\multiprocessing\spawn.py",line 236,in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\python3\lib\multiprocessing\spawn.py",line 287,in _fixup_main_from_path
main_content = runpy.run_path(main_path,File "C:\python3\lib\runpy.py",line 265,in run_path
return _run_module_code(code,init_globals,run_name,line 97,in _run_module_code
_run_code(code,mod_globals,line 87,in _run_code
exec(code,run_globals)
File "C:\Users\GEGE-01\Documents\Arbitrage\Binance\binance-data\binance-his-data.py",line 4,in <module>
store_data = DataClient().kline_data(pair_list,'C:\\Users\\GEGE-01\\Documents\\Arbitrage\\Binance\\binance-data\\15min'],progress_statements=True)
File "C:\python3\lib\site-packages\binance_data\client.py",line 384,in kline_data
csv_file_info = mp.Manager().list()
File "C:\python3\lib\multiprocessing\context.py",line 57,in Manager
m.start()
File "C:\python3\lib\multiprocessing\managers.py",line 579,in start
self._process.start()
File "C:\python3\lib\multiprocessing\process.py",line 121,in start
self._popen = self._Popen(self)
File "C:\python3\lib\multiprocessing\context.py",line 327,in _Popen
return Popen(process_obj)
File "C:\python3\lib\multiprocessing\popen_spawn_win32.py",line 45,in __init__
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:\python3\lib\multiprocessing\spawn.py",line 154,in get_preparation_data
_check_not_importing_main()
File "C:\python3\lib\multiprocessing\spawn.py",line 134,in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
已经读过它,我知道这是Windows上Python的常见错误。不是Python的人,我似乎无法弄清楚。我已经看到了一些关于SO的答案,但这对我来说很困惑。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。