Databricks Delta Lake pipelines

How to handle late-arriving and corrected records in Databricks Delta Lake pipelines

How to handle late-arriving and corrected records in Databricks Delta Lake pipelines

Written by

Kavya Kumari
Kavya Kumari
Data engineering

Most data pipelines don't fail loudly. They fail silently, by producing incorrect data.

Six months into managing an insurance data platform, a monthly claims audit revealed a discrepancy. Total claim costs were running 12% higher than the source system.

Root cause: late-arriving data.

claim_id

payout_amount

status

loaded_at

record_type

CLM-10291

$3,800.00

APPROVED

2024-11-14 08:23

Original

CLM-10291

$4,200.00

ADJUSTED

2024-11-17 14:55

Late correction

CLM-10294

$12,150.00

APPROVED

2024-11-14 09:01

Original

CLM-10297

$950.00

SUBMITTED

2024-11-14 11:44

Original

Figure 1: Append-only pipeline creates duplicate records for the same claim_id due to late-arriving corrections

Why append-only loading fails

Append-only pipelines assume data never changes.

Append-only pipelines are popular because they are simple to build, easy to scale, and easy to audit. They work well when every record is new and never changes, like logs or clickstream data.

The problem starts when data can change after it is created. In real systems, claims are updated, orders are modified, and transactions are corrected. These updates appear as new records in the pipeline, so it stores both the original and corrected records. Both remain in the table.

Append-only behavior (problem)

Every incoming record is added as a new row. Both the original and corrected records exist simultaneously. Data gets duplicated, and aggregations become incorrect. The result is incorrect numbers with no errors.

Detecting duplicates

python

df = spark.read.format("delta").load("/tmp/delta/claims")
display(df)

df.groupBy("claim_id").count().display()
python

df = spark.read.format("delta").load("/tmp/delta/claims")
display(df)

df.groupBy("claim_id").count().display()

Figure 2: Duplicate detection using GROUP BY highlights records appearing multiple times

The fix: merge and upsert patterns in Delta Lake

Delta Lake's MERGE INTO compares incoming records against the existing table on a business key. When a match is found, the target row is updated. When no match exists, the record is inserted. No duplicates. No correction logic pushed to consumers. Full ACID guarantees.

Merge and upsert behavior (solution)

Corrections update existing records rather than creating new ones. Only one row per business key is maintained at any time. The latest data replaces old data.

MERGE SQL

MERGE INTO delta.`/tmp/delta/claims` AS target
USING updates AS source
ON target.claim_id = source.claim_id
WHEN MATCHED AND source.updated_at > target.updated_at THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT

MERGE INTO delta.`/tmp/delta/claims` AS target
USING updates AS source
ON target.claim_id = source.claim_id
WHEN MATCHED AND source.updated_at > target.updated_at THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT

Figure 3: Final dataset after MERGE with no duplicate records and correct values

Handling multiple corrections before MERGE: deduplication

Sometimes multiple corrections for the same record can arrive in the same batch. A claim might be updated twice within a few hours, a system might send multiple retries, or a batch load might contain historical corrections. In each of these cases, the same claim_id can appear multiple times in the incoming dataset.

Deduplication in Databricks (PySpark)

python

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define window partitioned by business key
window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc())

# Keep only latest record per claim_id
deduped_df = source_df.withColumn(
    "rn", row_number().over(window)
).filter("rn = 1").drop("rn")

display(deduped_df)
python

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define window partitioned by business key
window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc())

# Keep only latest record per claim_id
deduped_df = source_df.withColumn(
    "rn", row_number().over(window)
).filter("rn = 1").drop("rn")

display(deduped_df)

Watermarking and late-data windows for streaming pipelines

The merge pattern handles batch pipelines cleanly. Streaming pipelines reading from Kafka, Event Hubs, or Kinesis need an additional mechanism: watermarking. A watermark tells Spark how long to accept late data before closing a time window.

This is how watermarking controls late-arriving data in a streaming pipeline. Events arriving within the watermark window (4 days in this example) are accepted and merged. Late corrections arriving on Day 1 and Day 3 are still processed. Events arriving after the watermark boundary, such as the Day 5 event in the diagram, are ignored.

[Figure: Watermark boundary diagram showing Day 0 through Now, with Correction Day 1 and Correction Day 3 accepted and merged, and Event Day 5 dropped as past boundary]

This prevents infinite waiting for late data, excess memory usage, and unbounded state growth. The watermark duration should be based on the business SLA for how late data can realistically arrive.

Example code

python

stream_df = spark.readStream.format("delta").load("/tmp/source")
watermarked_df = stream_df.withWatermark("event_time", "4 days")
python

stream_df = spark.readStream.format("delta").load("/tmp/source")
watermarked_df = stream_df.withWatermark("event_time", "4 days")

The same pattern across industries

Late-arriving data is not unique to insurance. It exists in almost every system where data can be updated after it is created. The solution remains the same: use MERGE based on a business key. Only the key and the conditions change, not the pattern.

Insurance

Claims are often corrected two to four days after initial submission. Updates include changes to payouts, diagnosis updates, or reclassifications of claim type.

Match on: claim_id In some cases: claim_id + line_item_id

Logistics

Shipment statuses change over time as packages are delivered, delayed, or rerouted. Corrections can arrive late from carrier systems.

Match on: tracking_id + event_type

Important: Do not overwrite the entire record. Preserve shipment history.

Financial services

Transactions are adjusted during reconciliation. This includes chargebacks, reversals, and settlements.

Match on: transaction_id

Manufacturing

Sensor data may be corrected after calibration. Historical readings can be updated.

Match on: device_id + time range

How to test for this problem before it hits production

Test 1: Duplicate key test

Send the same record twice, once as an original and once as a correction. Verify that only one row exists and that the final value matches the latest correction.

python

df = spark.createDataFrame([
    ("CLM-1", 1000, "2024-01-01"),
    ("CLM-1", 1500, "2024-01-03")  # correction
], ["claim_id", "amount", "updated_at"])
python

df = spark.createDataFrame([
    ("CLM-1", 1000, "2024-01-01"),
    ("CLM-1", 1500, "2024-01-03")  # correction
], ["claim_id", "amount", "updated_at"])

Expected: only 1 row, with the latest value of 1500.

python

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc())
df_latest = df.withColumn("rn", row_number().over(window)) \
    .filter("rn = 1") \
    .drop("rn")
python

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc())
df_latest = df.withColumn("rn", row_number().over(window)) \
    .filter("rn = 1") \
    .drop("rn")

Test 2: Out-of-order test

Send the correction first, then the original record. Verify that older data does not overwrite newer data.

python

df = spark.createDataFrame([
    ("CLM-2", 1500, "2024-01-03"),  # newer
    ("CLM-2", 1000, "2024-01-01")   # older
], ["claim_id", "amount", "updated_at"])
python

df = spark.createDataFrame([
    ("CLM-2", 1500, "2024-01-03"),  # newer
    ("CLM-2", 1000, "2024-01-01")   # older
], ["claim_id", "amount", "updated_at"])

Expected: the record with the later updated_at timestamp should win regardless of arrival order.

Test 3: Multiple corrections test

Send multiple updates for the same record within a single batch. Verify that only the latest record is used and that no merge errors occur.

Test 4: Watermark boundary test (streaming)

Send one event within the allowed time window and one event just outside the window. Verify that the valid event is processed and the late event is either ignored or routed to a dead-letter destination.

python

df.withWatermark("event_time", "5 days")
python

df.withWatermark("event_time", "5 days")

Final takeaway

Late-arriving data is not an edge case. It is a fundamental property of real-world systems.

Pipelines don't fail loudly. They fail silently by producing incorrect data. That is why handling late data with MERGE, deduplication, and watermarking is essential for building reliable data pipelines.


What is a late-arriving record in a data pipeline?

A late-arriving record is a data update that reaches your pipeline after the original record has already been processed and stored. This commonly happens when source systems send corrections, adjustments, or status changes days after the initial transaction, as seen with insurance claims, shipment updates, and financial reconciliations.

Why don't append-only pipelines catch this problem on their own?

Because append-only pipelines are designed to treat every incoming record as new. They have no mechanism to recognize that an incoming record is a correction to something already stored. Both versions of the record are added to the table, and the pipeline reports success. There is no error, no alert, and no indication that the data is wrong.

How do I choose the right watermark duration for my pipeline?

Base it on your business SLA for late data rather than a technical default. Ask how late a correction can realistically arrive in your specific domain. In insurance, claims corrections commonly arrive within two to four days. In financial services, reconciliation adjustments may arrive within one business day. Set your watermark to cover the realistic upper bound for your use case, with a small buffer to avoid dropping valid corrections at the boundary.

What is the difference between MERGE and a standard INSERT in Delta Lake?

A standard INSERT adds every incoming record as a new row regardless of whether that record already exists. MERGE compares incoming records against the existing table on a business key and decides whether to update an existing row or insert a new one. This means corrections replace originals rather than duplicating them.

Do I need to deduplicate before running MERGE?

Yes, if your incoming batch can contain multiple versions of the same record. MERGE expects one source row per business key. If the same claim_id appears twice in the incoming dataset, the MERGE will encounter a conflict. Deduplicating the source batch first, keeping only the latest record per key, ensures the MERGE runs cleanly.

When should I use watermarking, and when is MERGE enough?

MERGE is sufficient for batch pipelines where you process data in defined intervals and can tolerate some latency. Watermarking is needed for streaming pipelines reading from sources like Kafka, Event Hubs, or Kinesis, where data is continuously arriving and you need to define a boundary for how long the system should wait for late records before closing a time window.

Most data pipelines don't fail loudly. They fail silently, by producing incorrect data.

Six months into managing an insurance data platform, a monthly claims audit revealed a discrepancy. Total claim costs were running 12% higher than the source system.

Root cause: late-arriving data.

claim_id

payout_amount

status

loaded_at

record_type

CLM-10291

$3,800.00

APPROVED

2024-11-14 08:23

Original

CLM-10291

$4,200.00

ADJUSTED

2024-11-17 14:55

Late correction

CLM-10294

$12,150.00

APPROVED

2024-11-14 09:01

Original

CLM-10297

$950.00

SUBMITTED

2024-11-14 11:44

Original

Figure 1: Append-only pipeline creates duplicate records for the same claim_id due to late-arriving corrections

Why append-only loading fails

Append-only pipelines assume data never changes.

Append-only pipelines are popular because they are simple to build, easy to scale, and easy to audit. They work well when every record is new and never changes, like logs or clickstream data.

The problem starts when data can change after it is created. In real systems, claims are updated, orders are modified, and transactions are corrected. These updates appear as new records in the pipeline, so it stores both the original and corrected records. Both remain in the table.

Append-only behavior (problem)

Every incoming record is added as a new row. Both the original and corrected records exist simultaneously. Data gets duplicated, and aggregations become incorrect. The result is incorrect numbers with no errors.

Detecting duplicates

python

df = spark.read.format("delta").load("/tmp/delta/claims")
display(df)

df.groupBy("claim_id").count().display()

Figure 2: Duplicate detection using GROUP BY highlights records appearing multiple times

The fix: merge and upsert patterns in Delta Lake

Delta Lake's MERGE INTO compares incoming records against the existing table on a business key. When a match is found, the target row is updated. When no match exists, the record is inserted. No duplicates. No correction logic pushed to consumers. Full ACID guarantees.

Merge and upsert behavior (solution)

Corrections update existing records rather than creating new ones. Only one row per business key is maintained at any time. The latest data replaces old data.

MERGE SQL

MERGE INTO delta.`/tmp/delta/claims` AS target
USING updates AS source
ON target.claim_id = source.claim_id
WHEN MATCHED AND source.updated_at > target.updated_at THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT

Figure 3: Final dataset after MERGE with no duplicate records and correct values

Handling multiple corrections before MERGE: deduplication

Sometimes multiple corrections for the same record can arrive in the same batch. A claim might be updated twice within a few hours, a system might send multiple retries, or a batch load might contain historical corrections. In each of these cases, the same claim_id can appear multiple times in the incoming dataset.

Deduplication in Databricks (PySpark)

python

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define window partitioned by business key
window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc())

# Keep only latest record per claim_id
deduped_df = source_df.withColumn(
    "rn", row_number().over(window)
).filter("rn = 1").drop("rn")

display(deduped_df)

Watermarking and late-data windows for streaming pipelines

The merge pattern handles batch pipelines cleanly. Streaming pipelines reading from Kafka, Event Hubs, or Kinesis need an additional mechanism: watermarking. A watermark tells Spark how long to accept late data before closing a time window.

This is how watermarking controls late-arriving data in a streaming pipeline. Events arriving within the watermark window (4 days in this example) are accepted and merged. Late corrections arriving on Day 1 and Day 3 are still processed. Events arriving after the watermark boundary, such as the Day 5 event in the diagram, are ignored.

[Figure: Watermark boundary diagram showing Day 0 through Now, with Correction Day 1 and Correction Day 3 accepted and merged, and Event Day 5 dropped as past boundary]

This prevents infinite waiting for late data, excess memory usage, and unbounded state growth. The watermark duration should be based on the business SLA for how late data can realistically arrive.

Example code

python

stream_df = spark.readStream.format("delta").load("/tmp/source")
watermarked_df = stream_df.withWatermark("event_time", "4 days")

The same pattern across industries

Late-arriving data is not unique to insurance. It exists in almost every system where data can be updated after it is created. The solution remains the same: use MERGE based on a business key. Only the key and the conditions change, not the pattern.

Insurance

Claims are often corrected two to four days after initial submission. Updates include changes to payouts, diagnosis updates, or reclassifications of claim type.

Match on: claim_id In some cases: claim_id + line_item_id

Logistics

Shipment statuses change over time as packages are delivered, delayed, or rerouted. Corrections can arrive late from carrier systems.

Match on: tracking_id + event_type

Important: Do not overwrite the entire record. Preserve shipment history.

Financial services

Transactions are adjusted during reconciliation. This includes chargebacks, reversals, and settlements.

Match on: transaction_id

Manufacturing

Sensor data may be corrected after calibration. Historical readings can be updated.

Match on: device_id + time range

How to test for this problem before it hits production

Test 1: Duplicate key test

Send the same record twice, once as an original and once as a correction. Verify that only one row exists and that the final value matches the latest correction.

python

df = spark.createDataFrame([
    ("CLM-1", 1000, "2024-01-01"),
    ("CLM-1", 1500, "2024-01-03")  # correction
], ["claim_id", "amount", "updated_at"])

Expected: only 1 row, with the latest value of 1500.

python

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc())
df_latest = df.withColumn("rn", row_number().over(window)) \
    .filter("rn = 1") \
    .drop("rn")

Test 2: Out-of-order test

Send the correction first, then the original record. Verify that older data does not overwrite newer data.

python

df = spark.createDataFrame([
    ("CLM-2", 1500, "2024-01-03"),  # newer
    ("CLM-2", 1000, "2024-01-01")   # older
], ["claim_id", "amount", "updated_at"])

Expected: the record with the later updated_at timestamp should win regardless of arrival order.

Test 3: Multiple corrections test

Send multiple updates for the same record within a single batch. Verify that only the latest record is used and that no merge errors occur.

Test 4: Watermark boundary test (streaming)

Send one event within the allowed time window and one event just outside the window. Verify that the valid event is processed and the late event is either ignored or routed to a dead-letter destination.

python

df.withWatermark("event_time", "5 days")

Final takeaway

Late-arriving data is not an edge case. It is a fundamental property of real-world systems.

Pipelines don't fail loudly. They fail silently by producing incorrect data. That is why handling late data with MERGE, deduplication, and watermarking is essential for building reliable data pipelines.


What is a late-arriving record in a data pipeline?

A late-arriving record is a data update that reaches your pipeline after the original record has already been processed and stored. This commonly happens when source systems send corrections, adjustments, or status changes days after the initial transaction, as seen with insurance claims, shipment updates, and financial reconciliations.

Why don't append-only pipelines catch this problem on their own?

Because append-only pipelines are designed to treat every incoming record as new. They have no mechanism to recognize that an incoming record is a correction to something already stored. Both versions of the record are added to the table, and the pipeline reports success. There is no error, no alert, and no indication that the data is wrong.

How do I choose the right watermark duration for my pipeline?

Base it on your business SLA for late data rather than a technical default. Ask how late a correction can realistically arrive in your specific domain. In insurance, claims corrections commonly arrive within two to four days. In financial services, reconciliation adjustments may arrive within one business day. Set your watermark to cover the realistic upper bound for your use case, with a small buffer to avoid dropping valid corrections at the boundary.

What is the difference between MERGE and a standard INSERT in Delta Lake?

A standard INSERT adds every incoming record as a new row regardless of whether that record already exists. MERGE compares incoming records against the existing table on a business key and decides whether to update an existing row or insert a new one. This means corrections replace originals rather than duplicating them.

Do I need to deduplicate before running MERGE?

Yes, if your incoming batch can contain multiple versions of the same record. MERGE expects one source row per business key. If the same claim_id appears twice in the incoming dataset, the MERGE will encounter a conflict. Deduplicating the source batch first, keeping only the latest record per key, ensures the MERGE runs cleanly.

When should I use watermarking, and when is MERGE enough?

MERGE is sufficient for batch pipelines where you process data in defined intervals and can tolerate some latency. Watermarking is needed for streaming pipelines reading from sources like Kafka, Event Hubs, or Kinesis, where data is continuously arriving and you need to define a boundary for how long the system should wait for late records before closing a time window.

Share