Skip to content

Commit 40f115e

Browse files
committed
Add data comparisons for non-timestamped records in Meltano
Because of the way v2 meetings is eventing, we don't want to re-PUT the mapping every time we run the sync if nothing has changed. This changes the default behavior for the NATS loader to do full-object comparisons for records that don't have a timestamp field coming across from the tap (extractor). Signed-off-by: Eric Searcy <[email protected]>
1 parent ca24d7c commit 40f115e

File tree

1 file changed

+47
-23
lines changed
  • meltano/load/target-nats-kv/src/target_nats_kv

1 file changed

+47
-23
lines changed

meltano/load/target-nats-kv/src/target_nats_kv/__init__.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,9 @@ async def persist_messages(
162162
bookmark = bookmarks[stream]
163163
if bookmark is not None and len(bookmark) > 0:
164164
bookmark_attr = bookmark[0]
165-
if bookmark_attr is not None and refresh_mode != "full":
166-
if bookmark_attr in o.record:
167-
bookmark_source = o.record[bookmark_attr]
168-
else:
169-
bookmark_source = ""
170-
171-
# Compare data freshness using "bookmark" attribute.
165+
if refresh_mode != "full":
166+
# Compare data using either the "bookmark" or a data
167+
# comparison.
172168
try:
173169
# Try to fetch current value.
174170
current = await kv_client.get(key)
@@ -196,20 +192,48 @@ async def persist_messages(
196192
)
197193
continue
198194

199-
if bookmark_attr in current_record:
200-
bookmark_target = current_record[bookmark_attr]
195+
should_update = False
196+
if bookmark_attr is not None:
197+
# Compare bookmark values as strings to determine if the
198+
# new record should be refreshed based on the refresh mode.
199+
# Note, "newer" (which is the default) is implicit in the
200+
# "or" clause of this condition (if it's not "full" or
201+
# "same", it must be "newer").
202+
if bookmark_attr in o.record:
203+
bookmark_source = o.record[bookmark_attr]
204+
else:
205+
bookmark_source = ""
206+
207+
if bookmark_attr in current_record:
208+
bookmark_target = current_record[bookmark_attr]
209+
else:
210+
bookmark_target = ""
211+
212+
if (
213+
refresh_mode == "same"
214+
and str(bookmark_source) >= str(bookmark_target)
215+
) or str(bookmark_source) > str(bookmark_target):
216+
should_update = True
217+
elif refresh_mode == "same":
218+
# If there is no bookmark, "same" means always update.
219+
should_update = True
201220
else:
202-
bookmark_target = ""
203-
204-
# Compare bookmark values as strings to determine if the
205-
# new record should be refreshed based on the refresh mode.
206-
# Note, "newer" (which is the default) is implicit in the
207-
# "or" clause of this condition (if it's not "full" or
208-
# "same", it must be "newer").
209-
if (
210-
refresh_mode == "same"
211-
and str(bookmark_source) >= str(bookmark_target)
212-
) or str(bookmark_source) > str(bookmark_target):
221+
# No bookmark, and refresh_mode=="newer" (by process of
222+
# elimination), so, compare the data itself.
223+
target_record_filtered = {
224+
k: v
225+
for k, v in current_record.items()
226+
if not k.startswith("_sdc_")
227+
}
228+
source_record_filtered = {
229+
k: v
230+
for k, v in o.record.items()
231+
if not k.startswith("_sdc_")
232+
}
233+
if target_record_filtered != source_record_filtered:
234+
should_update = True
235+
236+
if should_update:
213237
# Update with revision
214238
await kv_client.update(
215239
key=key,
@@ -221,7 +245,7 @@ async def persist_messages(
221245
logger.debug(
222246
(
223247
"Skipping record for stream %s with key %s due to "
224-
"not being newer."
248+
"not needing update."
225249
),
226250
stream,
227251
primary_key_value,
@@ -235,8 +259,8 @@ async def persist_messages(
235259
value=json.dumps(o.record).encode("utf-8"),
236260
)
237261
else:
238-
# No bookmarks for this stream, or, user has requested "full"
239-
# sync, so use regular put.
262+
# User has requested "full" sync, so use "put" without
263+
# data checks.
240264
await kv_client.put(
241265
key=key,
242266
value=json.dumps(o.record).encode("utf-8"),

0 commit comments

Comments
 (0)