微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – 在级联中构建自定义连接逻辑,仅确保MAP_SIDE

我有3个级联管道(一个连接到另外两个),描述如下,

> LHSPipe – (较大尺寸)

> RHSPipes – (可能适合内存的较小尺寸)

Psuedocode如下,此示例涉及两个连接

如果F1DecidingFactor = YES则
使用RHS查找#1 BY(LHSPipe.F1Input = RHS查找#1.Join#F1)加入LHSPipe并设置查找结果(SET LHSPipe.F1Output = Result#F1)
除此以外
SET LHSPipe.F1Output = N / A

相同的逻辑适用于F2计算.

预期产量,

IF-ELSE决定是否加入,这种情况迫使我进入自定义加入操作.

考虑到上述情况,我想去MAP-SIDE加入(保持RHSPipe在MAP任务节点的内存中),我正在考虑以下可能的解决方案,每个都有其优缺点.需要你对这些建议.

选项1:

CoGroup – 我们可以使用带有BufferJoiner的CoGroup和自定义连接(操作)来构建自定义连接逻辑,但是不能确保MAP-SIDE连接.

选项2:

HashJoin – 它确保MAP-SIDE加入,但就我所见,无法使用此方式构建自定义连接.

请更正我的理解,并提出您的意见以处理此要求.

提前致谢.

解决方法

解决这个问题的最好方法(我可以想到)是修改你的较小的数据集.您可以向较小的数据集添加新的字段(F1DecidingFactor). F1Result的价值可以如下:

Sudo代码

if F1DecidingFactor == "Yes" then
    F1Result = ACTUAL_VALUE
else
    F1Result = "N/A"

结果表

|F1#Join|F1#Result|F1#DecidingFactor|
|    Yes|        0|             True|
|    Yes|        1|            False|
|     No|        0|              N/A|
|     No|        1|              N/A|

您也可以通过级联进行上述操作.

之后,您可以进行地图侧的加入.

如果修改较小的数据集是不可能的,那么我有2个选项来解决问题.

选项1

将新的字段添加到您的小管道,这相当于您决定的因素(即F1DecidingFactor_RHS =是).然后将其包含在您的加入条件中.一旦你的加入完成,你将只有这些条件匹配的那些行的值.否则将为空/空白.示例代码

主班

import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTestOption2 {
    public StackHashJoinTestOption2() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields f1DecidingFactor = new Fields("F1DecidingFactor");
        Fields f2DecidingFactor = new Fields("F2DecidingFactor");
        Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
        Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");

        Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
        Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);

        Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
        Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);

        Fields functionFields = new Fields("F1DecidingFactor","F1Output","F2DecidingFactor","F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // New field to small pipe. Expected Fields:
        // F1Join F1Result F1DecidingFactor_RHS
        rhsOne = new Each(rhsOne,new Insert(f1DecidingFactorRhs,"Yes"),Fields.ALL);

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // New field to small pipe. Expected Fields:
        // F2Join F2Result F2DecidingFactor_RHS
        rhsTwo = new Each(rhsTwo,Fields.ALL);

        // Joining first small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
        Pipe resultsOne = new HashJoin(largePipe,lhsJoinerOne,rhsOne,rhsJoinerOne,new LeftJoin());

        // Joining second small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
        Pipe resultsTwo = new HashJoin(resultsOne,lhsJoinerTwo,rhsTwo,rhsJoinerTwo,new LeftJoin());

        Pipe result = new Each(resultsTwo,functionFields,new TestFunction(),Fields.REPLACE);

        result = new discard(result,f1DecidingFactorRhs);
        result = new discard(result,f2DecidingFactorRhs);

        // result Pipe should have expected result
    }
}

选项2

如果要使用认值而不是null / blank,那么建议先使用认的Joiners进行HashJoin,然后使用一个函数来更新具有适当值的元组.就像是:

主班

import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTest {
    public StackHashJointest() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields functionFields = new Fields("F1DecidingFactor","F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // Joining first small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
        Pipe resultsOne = new HashJoin(largePipe,f1Input,f1Join,new LeftJoin());

        // Joining second small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
        Pipe resultsTwo = new HashJoin(resultsOne,f2Input,f2Join,Fields.REPLACE);

        // result Pipe should have expected result
    }
}

更新功能

import cascading.flow.FlowProcess;
import cascading.operation.BaSEOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

public class TestFunction extends BaSEOperation<Void> implements Function<Void> {

    private static final long serialVersionUID = 1L;

    private static final String DECIDING_FACTOR = "No";
    private static final String DEFAULT_VALUE = "N/A";

    // Expected Fields: "F1DecidingFactor","F2Output"
    public TestFunction() {
        super(Fields.ARGS);
    }

    @Override
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process,FunctionCall<Void> call) {
        TupleEntry arguments = call.getArguments();

        TupleEntry result = new TupleEntry(arguments);

        if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F1Output",DEFAULT_VALUE);
        }

        if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F2Output",DEFAULT_VALUE);
        }

        call.getoutputCollector().add(result);
    }

}

参考

> Insert Function
> Custom Function
> HashJoin

这应该可以解决你的问题.让我知道这是否有帮助.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


应用场景 C端用户提交工单、工单创建完成之后、会发布一条工单创建完成的消息事件(异步消息)、MQ消费者收到消息之后、会通知各处理器处理该消息、各处理器处理完后都会发布一条将该工单写入搜索引擎的消息、最终该工单出现在搜索引擎、被工单处理人检索和处理。 事故异常体现 1、异常体现 从工单的流转记录发现、
线程类,设置有一个公共资源 package cn.org.chris.concurrent; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Descrip
Java中的数字(带有0前缀和字符串)
在Java 9中使用JLink的目的是什么?
Java Stream API Filter(过滤器)
在Java中找到正数和负数数组元素的数量
Java 9中JShell中的不同启动脚本是什么?
使用Java的位填充错误检测技术
java中string是什么
如何使用Java中的JSON-lib API将Map转换为JSON对象?