微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Spark 根据记录在另一个 Cassandra 表中的不可用情况从 Cassandra 表中删除记录

如何解决使用 Spark 根据记录在另一个 Cassandra 表中的不可用情况从 Cassandra 表中删除记录

我有两个 Cassandra 表

表 1 [tb1_id,tb2_id,first_req_time,other_fields,Primary Key(tb1_id)]

表 2 [tb2_id,Primary Key(tb2_id)]

两个表都很大(每个大约 300 GB),表 1 有大约 25 亿条记录,表 2 有 2.2 亿条记录。

现在,我想根据以下标准定期从表 1 中删除一些记录(以保持大小):-

  1. 如果 first_req_time 超过 6 个月。
  2. 对于table-1中的记录[tb1_id,tb2_id],table-2中没有对应的记录。
import java.util.Calendar
import com.datastax.driver.core.ConsistencyLevel
import com.datastax.driver.core.querybuilder.{Delete,QueryBuilder,Select}
import com.datastax.spark.connector.cql.CassandraConnector
import java.util.ArrayList
import scala.util.control.Breaks._
import java.util.UUID.randomUUID

def isRecordActive(row: com.datastax.spark.connector.CassandraRow,refTime: Long): Boolean = {
  val first_req_time = row.get[Option[Long]]("first_req_time").getorElse(0L)
  first_req_time >= refTime
}


val cal = Calendar.getInstance()
cal.add(Calendar.MONTH,-6)
val limit = 3000
val connector = CassandraConnector(sc.getConf)
val noOfRowsDeleted = sc.accumulator(0)
val table1 = sc.cassandratable("db","table1")

table1.filter(row => !isRecordActive(row,cal.getTime.getTime)).foreachPartition(partition => {
  val session = connector.openSession
  val listofIds = new ArrayList[java.util.UUID]()
  val mapOfRecords = collection.mutable.Map[java.util.UUID,com.datastax.spark.connector.CassandraRow]()

  // collect limited number of records in mapOfRecords
  breakable {
    partition.foreach { elem =>
      val tb2_id = elem.get[Option[java.util.UUID]]("tb2_id").getorElse(null)
      listofIds.add(tb2_id)
      mapOfRecords(tb2_id) = elem
      if(listofIds.size > limit) break
    }
  }

  // filter records found in table2 
  val select: Select = QueryBuilder.select.from("db","table2")
  select.where(QueryBuilder.in("tb2_id",listofIds))
  select.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
  val resultSet = session.execute(select)
  if(!resultSet.isExhausted()){
    val resultIter = resultSet.iterator()
    while(resultIter.hasNext){
      val cur = resultIter.next
      val tb2_id = cur.getUUID("tb2_id")
      mapOfRecords.remove(tb2_id)
    }
  }

  // delete remaining records in mapOfRecords
  for ((tb2_id,tb1_record) <- mapOfRecords) {
    val delete: Delete = QueryBuilder.delete.from("db","table1")
    delete.where(QueryBuilder.eq("tb1_id",tb1_record.get[java.util.UUID]("tb1_id")))
    delete.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
    session.execute(delete)
    noOfRowsDeleted += 1
  }
  session.close()
})

解决方案的问题:

  1. noOfRowsDeleted 并不多,单次运行它们大约为 100K。假设我一个一个地执行后续运行,在每次迭代中 noOfRowsDeleted 减少。这种减少的原因是我每次都指的是分区(本地化查找)的前 3000 个元素,并试图在 table2 中找到它们,而可供删除的批次正在减少。
  2. 如果我尝试将限制从 3000 增加,则会出现内存不足异常。我怀疑这是因为我正在创建内部对象(listofIds、mapOfRecords),尽管我不确定。

分区数为 2000。

我意识到我在做一些愚蠢的事情,必须有更好的方法来实现我想要实现的目标。

努力实现:与目前的 10 万次相比,一次性删除 200 万次。

解决方法

我建议使用 Spark SQL 代替 RDD API,至少在准备数据时 - 与 RDD 相比,使用它更容易。

基本上你需要做的是:

  1. 准备数据 - 您只需读取两个表并过滤第一个表以获取旧数据,然后对第二个表执行 left anti join,因此您将只剩下不匹配的记录
  2. 使用来自 RDD API 的 deleteFromCassandra function 进行删除。

像这样(没有测试实际删除,只有数据准备,但应该可以):

import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._

val df1 = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "table1","keyspace" -> "ks"))
  .load().select("tb1_id","tb2_id","first_req_time")
val df2 = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "table2","keyspace" -> "ks"))
  .load().select("tb2_id")

// Find old data (current_date - 180 days)
val df1_old = df1.filter(to_date($"first_req_time") < date_sub(current_date,180))
// find records that aren't in the second table
val joined = df1_old.join(df2,Seq("tb2_id"),"left_anti")
  .select("tb1_id","tb2_id")
// perform deletion
joined.rdd.deleteFromCassandra("ks","table1",keyColumns = SomeColumns("tb1_id","tb2_id")

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。