微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink如何做维表关联?

使用

RichAsyncFunction 加 CacheBuilder


CacheBuilder.newBuilder()
        //最多存储10000条
        .maximumSize(10000)
        //过期时间为1分钟
        .expireAfterWrite(60, TimeUnit.SECONDS)
        .build();
public class LRU extends RichAsyncFunction<String,Order> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
    String table = "info";
    Cache<String, String> cache = null;
    private HBaseClient client = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //创建hbase客户端
        client = new HBaseClient("127.0.0.1","7071");
        cache = CacheBuilder.newBuilder()
                //最多存储10000条
                .maximumSize(10000)
                //过期时间为1分钟
                .expireAfterWrite(60, TimeUnit.SECONDS)
                .build();
    }
    @Override
    public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
        JSONObject jsonObject = JSONObject.parSEObject(input);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //读缓存
        String cacheCityName = cache.getIfPresent(cityId);
        //如果缓存获取失败再从hbase获取维度数据
        if(cacheCityName != null){
            Order order = new Order();
            order.setCityId(cityId);
            order.setItems(items);
            order.setUserName(userName);
            order.setCityName(cacheCityName);
            resultFuture.complete(Collections.singleton(order));
        }else {
            client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<keyvalue>>) arg -> {
                for (keyvalue kv : arg) {
                    String value = new String(kv.value());
                    Order order = new Order();
                    order.setCityId(cityId);
                    order.setItems(items);
                    order.setUserName(userName);
                    order.setCityName(value);
                    resultFuture.complete(Collections.singleton(order));
                    cache.put(String.valueOf(cityId), value);
                }
                return null;
            });
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐