微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用KTables中的数据丰富卡夫卡流

如何解决使用KTables中的数据丰富卡夫卡流

我目前维持财务申请。尽管此财务应用程序中进行了许多计算,但其中一项计算是确定1)新的传入交易占总交易额的百分比? 2)新交易相对于同一客户占给定客户交易总额的百分比?

为简单起见,让我们假设交易数据将在每天早上6点截断,也就是程序启动的时间。换句话说,对于给定的一天,我们在这里主要使用静态数据。

例如:

  • 交易1:客户1-> 100美元
  • 交易2:客户1-> 100美元
  • 交易3:客户2-> 100美元

我想知道的是,交易1占总交易的33%。交易1占客户1总交易的50%。

以下是到目前为止的代码的略微简化版本,它作为单个Java进程运行,并且所有数据与堆存储在同一进程中(因此此处没有进程间通信)。

DAO类:维护应用程序数据

public class ApplicationDataDao {
    private Map<String,Transaction> transactionsByTransactionId;
    private Map<String,Transcation> transcationsByCustomerId;
    private TranscationAggregate transcationAggregate;
    private Map<String,TranscationAggregate> transactionAggregateByCustomerId;

    //constructor,getters and setters to populate these maps and to retrieve data 
    from these maps
}

交易类:表示交易

public class Transaction {
     private String transcationId;
     private String customerId;
     private BigDecimal transcationAmount;
     
     private BigDecimal transcationPercentageAllocation;
     private BigDecimal customerPercentageAllocation;
}

汇总类:在交易级别和客户级别保存汇总总数。

public class TranscationAggregate {
    private BigDecimal totalTranscationAmount = BigDecimal.ZERO;

    private String trancationId;
    private String customerId;
         
    private void aggregate(BigDecimal currentTranscationAmount) {
        totalTranscationAmount.add(currentTranscationAmount);            
    }      
 
}

从今天的截止文件中读取数据

    ApplicationDataDao dao = getSingletonApplicationDataDao();
    
    for(String line : reader.read()) {

         String []tokens = line.split(",");
         Transaction transaction = new Transaction();
         transaction.setTransactionId(tokens[0]);
         transaction.setCustomerId(tokens[1]);
         transcation.setTransactionAmount(tokens[2]);
         dao.putTransactionByTransactionId(transaction.getTranscationId());
         dao.putTranscationByCustomerId(transcation.getCustomerId());     
         //Keep a track of the total transaction amount and total transaction amount by customer id.
         dao.getTranscationAggregate().aggregate(transcation.getTranscationAmount());
         dao.getTranscationAggregateByCustomerId(transcation.getCustomerId()).
         aggregate(transcation.getTranscationAmount());

        
                    
      }

计算交易相对于其他交易的百分比分配

      for(Transaction transaction : dao.getTranscationsByTranscationId().values()) {
                  transaction.setTranscationPercentageAllocation(transaction.getTranscationAmount().divide(dao.getTransactionAggregate().getTotalTransactionAmount())
     }

计算客户交易相对于同一客户的其他交易的百分比分配

     for(TransactionAggregate transactionAggregate : dao.getTranscationAggregateByCustomerId()) {
       Transaction transaction = dao.getTranscationByCustomerId(transactionAggregate .getCustomerId());
       transaction.setCustomerPercentageAllocation(transaction.getTranscationAmount().divide(transactionAggregate.getTotalTransactionAmount())
     }

到目前为止,此应用程序在其他团队使用的专用UNIX盒上运行。换句话说,它是一个独立的整体应用程序。我想将此应用程序重构为基于Kafka Stream的应用程序。这意味着上面的for循环将分解为生产者和使用者,而不是像下面那样在单个for循环中完成所有工作:

  1. 独立程序,该程序从文件中读取一行,将其转换为事务对象,然后将其写入Kafka主题
  2. 另一方面,流消费者读取Transaction对象并创建两个KTable实例,分别保存总交易金额(空键)和按客户r id(以客户id为键)汇总交易金额
  3. 例如,将Ktable实例写入两个单独的Kafka主题(transaction-aggregate-topic和customer-aggregate-topic)。

我现在有了交易对象流。我也有两个主题,它们本质上是合计的。我的问题是:如何用每个事务的聚合KTables中的值重新丰富事务流,以便当我在处理结束时查看流时,每个事务对象现在都知道它相对于其他事务的百分比或同一客户在其他交易中所占的百分比。 (对于初学者来说,事务流没有键。一个键如何与两个KTables匹配事务流中的消息?)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。