Using Apache Flink With Delta Lake

Incorporating Flink datastreams into your Lakehouse Architecture

As with all parts of our platform, we are constantly raising the bar and adding new features to enhance developers’ abilities to build the applications that will make their Lakehouse a reality. Building real-time applications on Databricks is no exception. Features like asynchronous checkpointing, session windows, and Delta Live Tables allow organizations to build even more powerful, real-time pipelines on Databricks using Delta Lake as the foundation for all the data that flows through the Lakehouse.

However, for organizations that leverage Flink for real-time transformations, it might appear that they are unable to take advantage of some of the great Delta Lake and Databricks features, but that is not the case. In this blog we will explore how Flink developers can build pipelines to integrate their Flink applications into the broader Lakehouse architecture.

High-level diagram of Flink application to Delta Lake data flows

A stateful Flink application

Let’s use a credit card company to explore how we can do this.

For credit card companies, preventing fraudulent transactions is table-stakes for a successful business. Credit card fraud poses both reputational and revenue risk to a financial institution and, therefore, credit card companies must have systems in place to remain constantly vigilant in preventing fraudulent transactions. These organizations may implement monitoring systems using Apache Flink, a distributed event-at-a-time processing engine with fine-grained control over streaming application state and time.

Below is a simple example of a fraud detection application in Flink. It monitors transaction amounts over time and sends an alert if a small transaction is immediately followed by a large transaction within one minute for any given credit card account. By leveraging Flink’s ValueState data type and KeyedProcessFunction together, developers can implement their business logic to trigger downstream alerts based on event and time states.

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.walkthrough.common.entity.Alert
import org.apache.flink.walkthrough.common.entity.Transaction

object FraudDetector {
  val SMALL_AMOUNT: Double = 1.00
  val LARGE_AMOUNT: Double = 500.00
  val ONE_MINUTE: Long     = 60 * 1000L
}

@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {

  @transient private var flagState: ValueState[java.lang.Boolean] = _
  @transient private var timerState: ValueState[java.lang.Long] = _

  @throws[Exception]
  override def open(parameters: Configuration): Unit = {
    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
    flagState = getRuntimeContext.getState(flagDescriptor)

    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
    timerState = getRuntimeContext.getState(timerDescriptor)
  }

  override def processElement(
      transaction: Transaction,
      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
      collector: Collector[Alert]): Unit = {

    // Get the current state for the current key
    val lastTransactionWasSmall = flagState.value

    // Check if the flag is set
    if (lastTransactionWasSmall != null) {
      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
        // Output an alert downstream
        val alert = new Alert
        alert.setId(transaction.getAccountId)

        collector.collect(alert)
      }
      // Clean up our state
      cleanUp(context)
    }

    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
      // set the flag to true
      flagState.update(true)
      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE

      context.timerService.registerProcessingTimeTimer(timer)
      timerState.update(timer)
    }
  }

 override def onTimer(
      timestamp: Long,
      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
      out: Collector[Alert]): Unit = {
    // remove flag after 1 minute
    timerState.clear()
    flagState.clear()
  }

  @throws[Exception]
  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
    // delete timer
    val timer = timerState.value
    ctx.timerService.deleteProcessingTimeTimer(timer)

    // clean up all states
    timerState.clear()
    flagState.clear()
  }
}

In addition to sending alerts, most organizations will want the ability to perform analytics on all the transactions they process. Fraudsters are constantly evolving the techniques they use in the hopes of remaining undetected, so it is quite likely that a simple heuristic-based fraud detection application, such as the above, will not be sufficient for preventing all fraudulent activity. Organizations leveraging Flink for alerting will also need to combine disparate data sets to create advanced fraud detection models that analyze more than just transactional data, but include data points such as demographic information of the account holder, previous purchasing history, time and location of transactions, and more.

Integrating Flink applications using cloud object store sinks with Delta Lake

Diagram showing data flow from a Flink application to cloud object storage for consumption by Auto Loader into Delta Lake

There is a tradeoff between very low-latency operational use-cases and running performant OLAP on big datasets. To meet operational SLAs and prevent fraudulent transactions, records need to be produced by Flink nearly as quickly as events are received, resulting in small files (on the order of a few KBs) in the Flink application’s sink. This “small file problem” can lead to very poor performance in downstream queries, as execution engines spend more time listing directories and pulling files from cloud storage than they do actually processing the data within those files. Consider the same fraud detection application that writes transactions as parquet files with the following schema:

root
 |-- dt: timestamp (nullable = true)
 |-- accountId: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- alert: boolean (nullable = true)

Fortunately, Databricks Auto Loader makes it easy to stream data landed into object storage from Flink applications into Delta Lake tables for downstream ML and BI on that data.

from pyspark.sql.functions import col, date_format

data_path = "/demo/flink_delta_blog/transactions"
delta_silver_table_path = "/demo/flink_delta_blog/silver_transactions"
checkpoint_path = "/demo/flink_delta_blog/checkpoints/delta_silver"

flink_parquet_schema = spark.read.parquet(data_path).schema

# Enable Auto Optimize to handle the small file problem
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

flink_parquet_to_delta_silver = (spark.readStream.format("cloudFiles")
                                 .option("cloudFiles.format", "parquet")
                                 .schema(flink_parquet_schema)
                                 .load(data_path)
                                 .withColumn("date", date_format(col("dt"), "yyyy-MM-dd"))  # use for partitioning the downstream Delta table
                                 .withColumnRenamed("dt", "timestamp")
                                 .writeStream
                                 .format("delta")
                                 .option("checkpointLocation", checkpoint_path)
                                 .partitionBy("date")
                                 .start(delta_silver_table_path)
                                )

Delta Lake tables automatically optimize the physical layout of data in cloud storage through compaction and indexing to mitigate the small file problem and enable performant downstream analytics.

-- Further optimize the physical layout of the table using ZORDER.
OPTIMIZE delta.`/demo/flink_delta_blog/silver_transactions`
ZORDER BY (accountId)

Much like Auto-Loader can transform a static source like cloud storage into a streaming datasource, Delta Lake tables also function as streaming sources despite being stored in object storage. This means that organizations using Flink for operational use cases can leverage this architectural pattern for streaming analytics without sacrificing their real-time requirements.

streaming_delta_silver_table = (spark.readStream.format("delta")
                                .load(delta_silver_table_path)
                                # ... additional streaming ETL and/or analytics here...
                               )

Integrating Flink applications using Apache Kafka and Delta Lake

Let’s say the credit card company wanted to use their fraud detection model that they built in Databricks, and the model to score the data in real-time. Pushing files to cloud storage might not be fast enough for some SLAs around fraud detection, so they can write data from their Flink application to message bus systems like Kafka, AWS Kinesis, or Azure Event Hub. Once the data is written to Kafka, a Databricks job can read from Kafka and write to Delta Lake.

Focused diagram showing the flow of data from a raw stream of data to Delta Lake using Flink and Kafka

For Flink developers, there is a Kafka Connector that can be integrated with your Flink projects to allow for DataStream API and Table API-based streaming jobs to write out the results to an organization’s Kafka cluster. Note that as of the writing of this blog, Flink does not come packaged with this connector, so you will need to include the Kafka Connector JAR in your project’s build file (i.e. pom.xml, build.sbt, etc).

Here is an example of how you would write the results of your DataStream in Flink to a topic on the Kafka Cluster:

package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream transactions = env
            .addSource(new TransactionSource())
            .name("transactions");

	String brokers = “enter-broker-information-here”

KafkaSink sink = KafkaSink.builder()
     .setBootstrapServers(brokers)
     .setRecordSerializer(KafkaRecordSerializationSchema.builder()
           .setTopic("transactions")
           .setValueSerializationSchema(new TransactionSchema())
           .build()
     )
     .build();


        transactions.sinkTo(sink)

        env.execute("Fraud Detection");
    }
}

Now you can easily leverage Databricks to write a Structured Streaming application to read from the Kafka topic that the results of the Flink DataStream wrote out to. To establish the read from Kafka...

kafka = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext ) 
  .option("subscribe", “fraud-events” )
  .option("startingOffsets", "latest" )
  .load())

kafkaTransformed = kafka.select(from_json(col(“value”).cast(“string), schema) \
			...additional transformations

Once the data has been schematized, we can load our model and score the microbatch of data that Spark processes after each trigger. For a more detailed example of Machine Learning models and Structured streaming, check this article out in our documentation.

import pyspark.ml.Pipeline
pipelineModel = Pipeline.load(“/path/to/trained/model)

streamingPredictions = (pipelineModel.transform(kafkaTransformed)
 .groupBy(“id”)
 .agg(
   (sum(when('prediction === 'label, 1)) / count('label)).alias("true prediction rate"),
   count('label).alias("count")
 ))

Now we can write to Delta by configuring the writeStream and pointing it to our fraud_predictions Delta Lake table. This will allow us to build important reports on how we track and handle fraudulent transactions for our customers; we can even use the outputs to understand how our model is doing over time in terms of how many false positives it outputs or accurate assessments.

streamingPredictions.writeStream \
			.format(“delta”) \
			.outputMode(“append”) \
			.option(“checkpointLocation”, “/location/in/cloud/storage”) \
			.table(“fraud_predictions”)

Conclusion

With both of these options, Flink and Autoloader or Flink and Kafka, organizations can still leverage the features of Delta Lake and ensure they are integrating their Flink applications into their broader Lakehouse architecture. Databricks has also been working with the Flink community to build a direct Flink to Delta Lake connector, which you can read more about here.

Try Databricks for free Get started

Sign up