
How to handle late-arriving and corrected records in Databricks Delta Lake pipelines
How to handle late-arriving and corrected records in Databricks Delta Lake pipelines
Most data pipelines don't fail loudly. They fail silently, by producing incorrect data.
Six months into managing an insurance data platform, a monthly claims audit revealed a discrepancy. Total claim costs were running 12% higher than the source system.
Root cause: late-arriving data.
claim_id | payout_amount | status | loaded_at | record_type |
|---|---|---|---|---|
CLM-10291 | $3,800.00 | APPROVED | 2024-11-14 08:23 | Original |
CLM-10291 | $4,200.00 | ADJUSTED | 2024-11-17 14:55 | Late correction |
CLM-10294 | $12,150.00 | APPROVED | 2024-11-14 09:01 | Original |
CLM-10297 | $950.00 | SUBMITTED | 2024-11-14 11:44 | Original |
Figure 1: Append-only pipeline creates duplicate records for the same claim_id due to late-arriving corrections
Why append-only loading fails
Append-only pipelines assume data never changes.
Append-only pipelines are popular because they are simple to build, easy to scale, and easy to audit. They work well when every record is new and never changes, like logs or clickstream data.
The problem starts when data can change after it is created. In real systems, claims are updated, orders are modified, and transactions are corrected. These updates appear as new records in the pipeline, so it stores both the original and corrected records. Both remain in the table.
Append-only behavior (problem)
Every incoming record is added as a new row. Both the original and corrected records exist simultaneously. Data gets duplicated, and aggregations become incorrect. The result is incorrect numbers with no errors.
Detecting duplicates
python df = spark.read.format("delta").load("/tmp/delta/claims") display(df) df.groupBy("claim_id").count().display()
python df = spark.read.format("delta").load("/tmp/delta/claims") display(df) df.groupBy("claim_id").count().display()
Figure 2: Duplicate detection using GROUP BY highlights records appearing multiple times
The fix: merge and upsert patterns in Delta Lake
Delta Lake's MERGE INTO compares incoming records against the existing table on a business key. When a match is found, the target row is updated. When no match exists, the record is inserted. No duplicates. No correction logic pushed to consumers. Full ACID guarantees.
Merge and upsert behavior (solution)
Corrections update existing records rather than creating new ones. Only one row per business key is maintained at any time. The latest data replaces old data.
MERGE SQL
MERGE INTO delta.`/tmp/delta/claims` AS target USING updates AS source ON target.claim_id = source.claim_id WHEN MATCHED AND source.updated_at > target.updated_at THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT
MERGE INTO delta.`/tmp/delta/claims` AS target USING updates AS source ON target.claim_id = source.claim_id WHEN MATCHED AND source.updated_at > target.updated_at THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT
Figure 3: Final dataset after MERGE with no duplicate records and correct values
Handling multiple corrections before MERGE: deduplication
Sometimes multiple corrections for the same record can arrive in the same batch. A claim might be updated twice within a few hours, a system might send multiple retries, or a batch load might contain historical corrections. In each of these cases, the same claim_id can appear multiple times in the incoming dataset.
Deduplication in Databricks (PySpark)
python from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col # Define window partitioned by business key window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc()) # Keep only latest record per claim_id deduped_df = source_df.withColumn( "rn", row_number().over(window) ).filter("rn = 1").drop("rn") display(deduped_df)
python from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col # Define window partitioned by business key window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc()) # Keep only latest record per claim_id deduped_df = source_df.withColumn( "rn", row_number().over(window) ).filter("rn = 1").drop("rn") display(deduped_df)
Watermarking and late-data windows for streaming pipelines
The merge pattern handles batch pipelines cleanly. Streaming pipelines reading from Kafka, Event Hubs, or Kinesis need an additional mechanism: watermarking. A watermark tells Spark how long to accept late data before closing a time window.
This is how watermarking controls late-arriving data in a streaming pipeline. Events arriving within the watermark window (4 days in this example) are accepted and merged. Late corrections arriving on Day 1 and Day 3 are still processed. Events arriving after the watermark boundary, such as the Day 5 event in the diagram, are ignored.
[Figure: Watermark boundary diagram showing Day 0 through Now, with Correction Day 1 and Correction Day 3 accepted and merged, and Event Day 5 dropped as past boundary]
This prevents infinite waiting for late data, excess memory usage, and unbounded state growth. The watermark duration should be based on the business SLA for how late data can realistically arrive.
Example code
python stream_df = spark.readStream.format("delta").load("/tmp/source") watermarked_df = stream_df.withWatermark("event_time", "4 days")
python stream_df = spark.readStream.format("delta").load("/tmp/source") watermarked_df = stream_df.withWatermark("event_time", "4 days")
The same pattern across industries
Late-arriving data is not unique to insurance. It exists in almost every system where data can be updated after it is created. The solution remains the same: use MERGE based on a business key. Only the key and the conditions change, not the pattern.
Insurance
Claims are often corrected two to four days after initial submission. Updates include changes to payouts, diagnosis updates, or reclassifications of claim type.
Match on: claim_id In some cases: claim_id + line_item_id
Logistics
Shipment statuses change over time as packages are delivered, delayed, or rerouted. Corrections can arrive late from carrier systems.
Match on: tracking_id + event_type
Important: Do not overwrite the entire record. Preserve shipment history.
Financial services
Transactions are adjusted during reconciliation. This includes chargebacks, reversals, and settlements.
Match on: transaction_id
Manufacturing
Sensor data may be corrected after calibration. Historical readings can be updated.
Match on: device_id + time range
How to test for this problem before it hits production
Test 1: Duplicate key test
Send the same record twice, once as an original and once as a correction. Verify that only one row exists and that the final value matches the latest correction.
python df = spark.createDataFrame([ ("CLM-1", 1000, "2024-01-01"), ("CLM-1", 1500, "2024-01-03") # correction ], ["claim_id", "amount", "updated_at"])
python df = spark.createDataFrame([ ("CLM-1", 1000, "2024-01-01"), ("CLM-1", 1500, "2024-01-03") # correction ], ["claim_id", "amount", "updated_at"])
Expected: only 1 row, with the latest value of 1500.
python from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc()) df_latest = df.withColumn("rn", row_number().over(window)) \ .filter("rn = 1") \ .drop("rn")
python from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc()) df_latest = df.withColumn("rn", row_number().over(window)) \ .filter("rn = 1") \ .drop("rn")
Test 2: Out-of-order test
Send the correction first, then the original record. Verify that older data does not overwrite newer data.
python df = spark.createDataFrame([ ("CLM-2", 1500, "2024-01-03"), # newer ("CLM-2", 1000, "2024-01-01") # older ], ["claim_id", "amount", "updated_at"])
python df = spark.createDataFrame([ ("CLM-2", 1500, "2024-01-03"), # newer ("CLM-2", 1000, "2024-01-01") # older ], ["claim_id", "amount", "updated_at"])
Expected: the record with the later updated_at timestamp should win regardless of arrival order.
Test 3: Multiple corrections test
Send multiple updates for the same record within a single batch. Verify that only the latest record is used and that no merge errors occur.
Test 4: Watermark boundary test (streaming)
Send one event within the allowed time window and one event just outside the window. Verify that the valid event is processed and the late event is either ignored or routed to a dead-letter destination.
python df.withWatermark("event_time", "5 days")
python df.withWatermark("event_time", "5 days")
Final takeaway
Late-arriving data is not an edge case. It is a fundamental property of real-world systems.
Pipelines don't fail loudly. They fail silently by producing incorrect data. That is why handling late data with MERGE, deduplication, and watermarking is essential for building reliable data pipelines.
What is a late-arriving record in a data pipeline?
A late-arriving record is a data update that reaches your pipeline after the original record has already been processed and stored. This commonly happens when source systems send corrections, adjustments, or status changes days after the initial transaction, as seen with insurance claims, shipment updates, and financial reconciliations.
Why don't append-only pipelines catch this problem on their own?
Because append-only pipelines are designed to treat every incoming record as new. They have no mechanism to recognize that an incoming record is a correction to something already stored. Both versions of the record are added to the table, and the pipeline reports success. There is no error, no alert, and no indication that the data is wrong.
How do I choose the right watermark duration for my pipeline?
Base it on your business SLA for late data rather than a technical default. Ask how late a correction can realistically arrive in your specific domain. In insurance, claims corrections commonly arrive within two to four days. In financial services, reconciliation adjustments may arrive within one business day. Set your watermark to cover the realistic upper bound for your use case, with a small buffer to avoid dropping valid corrections at the boundary.
What is the difference between MERGE and a standard INSERT in Delta Lake?
A standard INSERT adds every incoming record as a new row regardless of whether that record already exists. MERGE compares incoming records against the existing table on a business key and decides whether to update an existing row or insert a new one. This means corrections replace originals rather than duplicating them.
Do I need to deduplicate before running MERGE?
Yes, if your incoming batch can contain multiple versions of the same record. MERGE expects one source row per business key. If the same claim_id appears twice in the incoming dataset, the MERGE will encounter a conflict. Deduplicating the source batch first, keeping only the latest record per key, ensures the MERGE runs cleanly.
When should I use watermarking, and when is MERGE enough?
MERGE is sufficient for batch pipelines where you process data in defined intervals and can tolerate some latency. Watermarking is needed for streaming pipelines reading from sources like Kafka, Event Hubs, or Kinesis, where data is continuously arriving and you need to define a boundary for how long the system should wait for late records before closing a time window.
Most data pipelines don't fail loudly. They fail silently, by producing incorrect data.
Six months into managing an insurance data platform, a monthly claims audit revealed a discrepancy. Total claim costs were running 12% higher than the source system.
Root cause: late-arriving data.
claim_id | payout_amount | status | loaded_at | record_type |
|---|---|---|---|---|
CLM-10291 | $3,800.00 | APPROVED | 2024-11-14 08:23 | Original |
CLM-10291 | $4,200.00 | ADJUSTED | 2024-11-17 14:55 | Late correction |
CLM-10294 | $12,150.00 | APPROVED | 2024-11-14 09:01 | Original |
CLM-10297 | $950.00 | SUBMITTED | 2024-11-14 11:44 | Original |
Figure 1: Append-only pipeline creates duplicate records for the same claim_id due to late-arriving corrections
Why append-only loading fails
Append-only pipelines assume data never changes.
Append-only pipelines are popular because they are simple to build, easy to scale, and easy to audit. They work well when every record is new and never changes, like logs or clickstream data.
The problem starts when data can change after it is created. In real systems, claims are updated, orders are modified, and transactions are corrected. These updates appear as new records in the pipeline, so it stores both the original and corrected records. Both remain in the table.
Append-only behavior (problem)
Every incoming record is added as a new row. Both the original and corrected records exist simultaneously. Data gets duplicated, and aggregations become incorrect. The result is incorrect numbers with no errors.
Detecting duplicates
python df = spark.read.format("delta").load("/tmp/delta/claims") display(df) df.groupBy("claim_id").count().display()
Figure 2: Duplicate detection using GROUP BY highlights records appearing multiple times
The fix: merge and upsert patterns in Delta Lake
Delta Lake's MERGE INTO compares incoming records against the existing table on a business key. When a match is found, the target row is updated. When no match exists, the record is inserted. No duplicates. No correction logic pushed to consumers. Full ACID guarantees.
Merge and upsert behavior (solution)
Corrections update existing records rather than creating new ones. Only one row per business key is maintained at any time. The latest data replaces old data.
MERGE SQL
MERGE INTO delta.`/tmp/delta/claims` AS target USING updates AS source ON target.claim_id = source.claim_id WHEN MATCHED AND source.updated_at > target.updated_at THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT
Figure 3: Final dataset after MERGE with no duplicate records and correct values
Handling multiple corrections before MERGE: deduplication
Sometimes multiple corrections for the same record can arrive in the same batch. A claim might be updated twice within a few hours, a system might send multiple retries, or a batch load might contain historical corrections. In each of these cases, the same claim_id can appear multiple times in the incoming dataset.
Deduplication in Databricks (PySpark)
python from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col # Define window partitioned by business key window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc()) # Keep only latest record per claim_id deduped_df = source_df.withColumn( "rn", row_number().over(window) ).filter("rn = 1").drop("rn") display(deduped_df)
Watermarking and late-data windows for streaming pipelines
The merge pattern handles batch pipelines cleanly. Streaming pipelines reading from Kafka, Event Hubs, or Kinesis need an additional mechanism: watermarking. A watermark tells Spark how long to accept late data before closing a time window.
This is how watermarking controls late-arriving data in a streaming pipeline. Events arriving within the watermark window (4 days in this example) are accepted and merged. Late corrections arriving on Day 1 and Day 3 are still processed. Events arriving after the watermark boundary, such as the Day 5 event in the diagram, are ignored.
[Figure: Watermark boundary diagram showing Day 0 through Now, with Correction Day 1 and Correction Day 3 accepted and merged, and Event Day 5 dropped as past boundary]
This prevents infinite waiting for late data, excess memory usage, and unbounded state growth. The watermark duration should be based on the business SLA for how late data can realistically arrive.
Example code
python stream_df = spark.readStream.format("delta").load("/tmp/source") watermarked_df = stream_df.withWatermark("event_time", "4 days")
The same pattern across industries
Late-arriving data is not unique to insurance. It exists in almost every system where data can be updated after it is created. The solution remains the same: use MERGE based on a business key. Only the key and the conditions change, not the pattern.
Insurance
Claims are often corrected two to four days after initial submission. Updates include changes to payouts, diagnosis updates, or reclassifications of claim type.
Match on: claim_id In some cases: claim_id + line_item_id
Logistics
Shipment statuses change over time as packages are delivered, delayed, or rerouted. Corrections can arrive late from carrier systems.
Match on: tracking_id + event_type
Important: Do not overwrite the entire record. Preserve shipment history.
Financial services
Transactions are adjusted during reconciliation. This includes chargebacks, reversals, and settlements.
Match on: transaction_id
Manufacturing
Sensor data may be corrected after calibration. Historical readings can be updated.
Match on: device_id + time range
How to test for this problem before it hits production
Test 1: Duplicate key test
Send the same record twice, once as an original and once as a correction. Verify that only one row exists and that the final value matches the latest correction.
python df = spark.createDataFrame([ ("CLM-1", 1000, "2024-01-01"), ("CLM-1", 1500, "2024-01-03") # correction ], ["claim_id", "amount", "updated_at"])
Expected: only 1 row, with the latest value of 1500.
python from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col window = Window.partitionBy("claim_id").orderBy(col("updated_at").desc()) df_latest = df.withColumn("rn", row_number().over(window)) \ .filter("rn = 1") \ .drop("rn")
Test 2: Out-of-order test
Send the correction first, then the original record. Verify that older data does not overwrite newer data.
python df = spark.createDataFrame([ ("CLM-2", 1500, "2024-01-03"), # newer ("CLM-2", 1000, "2024-01-01") # older ], ["claim_id", "amount", "updated_at"])
Expected: the record with the later updated_at timestamp should win regardless of arrival order.
Test 3: Multiple corrections test
Send multiple updates for the same record within a single batch. Verify that only the latest record is used and that no merge errors occur.
Test 4: Watermark boundary test (streaming)
Send one event within the allowed time window and one event just outside the window. Verify that the valid event is processed and the late event is either ignored or routed to a dead-letter destination.
python df.withWatermark("event_time", "5 days")
Final takeaway
Late-arriving data is not an edge case. It is a fundamental property of real-world systems.
Pipelines don't fail loudly. They fail silently by producing incorrect data. That is why handling late data with MERGE, deduplication, and watermarking is essential for building reliable data pipelines.
What is a late-arriving record in a data pipeline?
A late-arriving record is a data update that reaches your pipeline after the original record has already been processed and stored. This commonly happens when source systems send corrections, adjustments, or status changes days after the initial transaction, as seen with insurance claims, shipment updates, and financial reconciliations.
Why don't append-only pipelines catch this problem on their own?
Because append-only pipelines are designed to treat every incoming record as new. They have no mechanism to recognize that an incoming record is a correction to something already stored. Both versions of the record are added to the table, and the pipeline reports success. There is no error, no alert, and no indication that the data is wrong.
How do I choose the right watermark duration for my pipeline?
Base it on your business SLA for late data rather than a technical default. Ask how late a correction can realistically arrive in your specific domain. In insurance, claims corrections commonly arrive within two to four days. In financial services, reconciliation adjustments may arrive within one business day. Set your watermark to cover the realistic upper bound for your use case, with a small buffer to avoid dropping valid corrections at the boundary.
What is the difference between MERGE and a standard INSERT in Delta Lake?
A standard INSERT adds every incoming record as a new row regardless of whether that record already exists. MERGE compares incoming records against the existing table on a business key and decides whether to update an existing row or insert a new one. This means corrections replace originals rather than duplicating them.
Do I need to deduplicate before running MERGE?
Yes, if your incoming batch can contain multiple versions of the same record. MERGE expects one source row per business key. If the same claim_id appears twice in the incoming dataset, the MERGE will encounter a conflict. Deduplicating the source batch first, keeping only the latest record per key, ensures the MERGE runs cleanly.
When should I use watermarking, and when is MERGE enough?
MERGE is sufficient for batch pipelines where you process data in defined intervals and can tolerate some latency. Watermarking is needed for streaming pipelines reading from sources like Kafka, Event Hubs, or Kinesis, where data is continuously arriving and you need to define a boundary for how long the system should wait for late records before closing a time window.
Share

