如何解决在Databricks中使用Sparklyr收集表
我有一个parquet
表,大约有5 billion rows
。使用sparklyr
进行所有操作后,它会减少为1,880,573 rows
和629 columns
。当我尝试使用Factor Analysis
收集sdf_collect()
的信息时,它给了我这个内存错误:
Error : org.apache.spark.sql.execution.OutOfMemorySparkException: Total memory usage during row decode exceeds spark.driver.maxResultSize (4.0 GB). The average row size was 5.0 KB
1,573 rows x 629 columns
是否太大而无法收集sparklyr
?此外,使用data %>% dplyr::count()
检查行数花费了9 minutes
-如何减少这个时间?
解决方法
是的。 # secondary yaxis
column_implied_lst = [e for e in dfd2[available_crops].columns if e[:4]==selected_column[:4]]
column_implied = column_implied_lst[0]
fig.add_trace(go.Bar(x=dfd2[available_crops].index,y=dfd2[available_crops][column_implied],marker_color = "rgba(255,0.4)"),secondary_y=True)
fig.update_layout(yaxis2=dict(title=dict(text='DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ column_implied)))
和column_implied_lst = [e for e in dfd2[available_crops].columns if e[:4]==selected_column[:4]]
太大。这不仅是一个棘手的问题,但是您的R实例将其收集到本地内存时会遇到很多麻烦。
关于count(),当您处理这种大小的数据时,不需要9分钟。您可以尝试的一件事是将计数减少到一个变量。 from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input,Output
# data
from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input,Output,State,ClientsideFunction
import dash_core_components as dcc
import dash_html_components as html
import pandas as pd
import plotly.graph_objs as go
from dash.dependencies import Input,Output
import dash_bootstrap_components as dbc
import numpy as np
from plotly.subplots import make_subplots
import plotly.express as px
import pandas as pd
from pandas import Timestamp
import numpy as np
# data ##########################################################################
index1= [1,2,3,4]
columns1 =['time','2m_temp_prod','total_precip_prod']
index2= [1,4]
columns2 = ['time','2m_temp_area','total_precip_area']
df_vals_prod = {'corn': pd.DataFrame(index=index1,columns = columns1,data= np.random.randn(len(index1),len(columns1))).cumsum(),'soybeans' : pd.DataFrame(index=index1,len(columns1))).cumsum()}
df_vals_area= {'corn': pd.DataFrame(index=index2,columns = columns2,data= np.random.randn(len(index2),len(columns2))).cumsum(),'soybeans' : pd.DataFrame(index=index2,len(columns2))).cumsum()}
# mimic data properties of your real world data
df_vals_prod['corn']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-23 12:00:00'),Timestamp('2020-09-23 18:00:00'),Timestamp('2020-09-24 00:00:00')]
df_vals_prod['corn'].set_index('time',inplace = True)
df_vals_prod['soybeans']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-24 00:00:00')]
df_vals_prod['soybeans'].set_index('time',inplace = True)
df_vals_area['corn']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-24 00:00:00')]
df_vals_area['corn'].set_index('time',inplace = True)
df_vals_area['soybeans']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-24 00:00:00')]
df_vals_area['soybeans'].set_index('time',inplace = True)
# dash ##########################################################################
app = JupyterDash(__name__)
# weighting
all_options = {
'prod': list(df_vals_prod[list(df_vals_prod.keys())[0]].columns),'area': list(df_vals_area[list(df_vals_prod.keys())[0]].columns)
}
app.layout = html.Div([
dcc.Dropdown(
id='produce-radio',options=[{'label': k,'value': k} for k in all_options.keys()],value='area'
),# dcc.Dropdown(
# id='produce-radio',# options=[
# {'label': k,'value': k} for k in all_options.keys()
# ],# value='prod',# clearable=False),html.Hr(),dcc.Dropdown(
id='crop-radio','value': k} for k in list(df_vals_prod.keys())],value=list(df_vals_prod.keys())[0]
),dcc.Dropdown(id='columns-radio'),html.Div(id='display-selected-values'),dcc.Graph(id="crop-graph")
])
# Callbacks #####################################################################
# Weighting selection.
@app.callback( # Dataframe PROD or AREA
Output('columns-radio','options'),# layout element: dcc.RadioItems(id='produce-radio'...)
[Input('produce-radio','value')])
def set_columns_options(selected_produce):
varz = [{'label': i,'value': i} for i in all_options[selected_produce]]
print('cb1 output: ')
print(varz)
return [{'label': i,'value': i} for i in all_options[selected_produce]]
# Columns selection
@app.callback(
Output('columns-radio','value'),# layout element: dcc.RadioItems(id='columns-radio'...)
[Input('columns-radio','options')])
def set_columns(available_options):
return available_options[0]['value']
# Crop selection
@app.callback(
Output('crop-radio',# layout element: dcc.RadioItems(id='columns-radio'...)
[Input('crop-radio','options')])
def set_crops(available_crops):
return available_crops[0]['value']
# Display selections in its own div
@app.callback( # Columns 2m_temp_prod,or....
Output('display-selected-values','children'),[Input('produce-radio',Input('crop-radio',Input('columns-radio','value')])
def set_display_children(selected_produce,available_crops,selected_column):
return('DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ selected_column)
# Make a figure based on the selections
@app.callback( # Columns 2m_temp_prod,or....
Output('crop-graph','figure'),'value')])
def make_graph(selected_produce,selected_column):
#global selected_column
# data source / weighting
if selected_produce == 'prod':
dfd = df_vals_prod
dfd2 = df_vals_area
if selected_produce == 'area':
dfd = df_vals_area
dfd2 = df_vals_prod
# plotly figure
# primary yaxis
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=dfd[available_crops].index,y=dfd[available_crops][selected_column]),secondary_y=False)
fig.update_layout(yaxis1=dict(title=dict(text='DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ selected_column)))
# secondary yaxis
column_implied_lst = [e for e in dfd2[available_crops].columns if e[:4]==selected_column[:4]]
column_implied = column_implied_lst[0]
fig.add_trace(go.Bar(x=dfd2[available_crops].index,secondary_y=True)
fig.update_layout(yaxis2=dict(title=dict(text='DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ column_implied)))
# layout makeover
fig.update_layout(title=dict(text='Column to match: '+ selected_column + '| Implied match: ' +column_implied))
fig['layout']['yaxis2']['showgrid'] = False
return(fig)
app.run_server(mode='inline',port = 8077,dev_tools_ui=True,dev_tools_hot_reload =True,threaded=True)
。话虽这么说,除了增加您的spark会话参数(即执行程序的数量)之外,我认为没有其他方法可以大大加快这次的速度。
如果可以的话,我建议在spark中进行因子分析,或者使用较小的数据样本。
,使用 sparklyr::spark_write_*
将 spark 数据帧导出到磁盘,然后将其读入您的 R 会话。
Parquet 是一个不错的选择,因为它具有快速且紧凑的读/写能力。
在写入操作导致单个文件之前,使用 sparklyr::repartition
将 spark 数据帧重新分区为一部分。这比读入 R 更好,而不是多个文件然后进行后续的行绑定操作。
建议不要使用 collect
函数收集“大”(取决于您的 Spark 配置、RAM)数据帧,因为它可能会将所有数据带到驱动程序节点。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。