微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

有没有一种方法可以将作业调度到Apache Flink中的特定处理器?

如何解决有没有一种方法可以将作业调度到Apache Flink中的特定处理器?

我是Apache Flink的新用户,我目前的目标是在异构处理系统上测试调度算法。因此,我将每个作业部署到哪个处理器变得非常重要。但是,我找不到如何指定将作业部署到的处理器ID,也找不到找到使处理器返回其可用性的方法。 如果您能给我一些提示,我将非常感谢您的帮助。希望您过得愉快:)

解决方法

我通过一个类似的问题来计划和监视flink子任务到计算机的特定CPU内核。我对问题(https://github.com/OpenHFT/Java-Thread-Affinity)使用LinuxJNAAffinity。也许您可以基于我的解决方案。这是我的UDF之一。

import java.util.BitSet;
import java.util.List;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.sense.flink.pojo.Point;
import org.sense.flink.pojo.ValenciaItem;
import org.sense.flink.util.CRSCoordinateTransformer;
import org.sense.flink.util.CpuGauge;
import org.sense.flink.util.SimpleGeographicalPolygons;

import net.openhft.affinity.impl.LinuxJNAAffinity;

public class ValenciaItemDistrictMap extends RichMapFunction<ValenciaItem,ValenciaItem> {
    private static final long serialVersionUID = 624354384779615610L;
    private SimpleGeographicalPolygons sgp;
    private transient CpuGauge cpuGauge;
    private BitSet affinity;
    private boolean pinningPolicy;

    public ValenciaItemDistrictMap() {
        this(false);
    }

    public ValenciaItemDistrictMap(boolean pinningPolicy) {
        this.pinningPolicy = pinningPolicy;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.sgp = new SimpleGeographicalPolygons();
        this.cpuGauge = new CpuGauge();
        getRuntimeContext().getMetricGroup().gauge("cpu",cpuGauge);

        if (this.pinningPolicy) {
            // listing the cpu cores available
            int nbits = Runtime.getRuntime().availableProcessors();
            // pinning operator' thread to a specific cpu core
            this.affinity = new BitSet(nbits);
            affinity.set(((int) Thread.currentThread().getId() % nbits));
            LinuxJNAAffinity.INSTANCE.setAffinity(affinity);
        }
    }

    @Override
    public ValenciaItem map(ValenciaItem value) throws Exception {
        // updates the CPU core current in use
        this.cpuGauge.updateValue(LinuxJNAAffinity.INSTANCE.getCpu());
        System.err.println(ValenciaItemDistrictMap.class.getSimpleName() + " thread[" + Thread.currentThread().getId()
                + "] core[" + this.cpuGauge.getValue() + "]");

        List<Point> coordinates = value.getCoordinates();
        boolean flag = true;
        int i = 0;
        while (flag) {
            Tuple3<Long,Long,String> adminLevel = sgp.getAdminLevel(coordinates.get(i));
            if (adminLevel.f0 != null && adminLevel.f1 != null) {
                value.setId(adminLevel.f0);
                value.setAdminLevel(adminLevel.f1);
                value.setDistrict(adminLevel.f2);
                flag = false;
            } else {
                i++;
            }
        }
        if (flag) {
            // if we did not find a district with the given coordinate we assume the
            // district 16
            value.clearCoordinates();
            value.addCoordinates(
                    new Point(724328.279007,4374887.874634,CRSCoordinateTransformer.DEFAULT_CRS_EPSG_25830));
            value.setId(16L);
            value.setAdminLevel(9L);
            value.setDistrict("Benicalap");
        }
        return value;
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。