Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 78 additions & 174 deletions cla-backend-go/cmd/signatures_timestamp_backfill/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,27 @@ const (
regionDefault = "us-east-1"

// attribute names
attrDateCreated = "date_created"
attrDateModified = "date_modified"
attrDateCreated = "date_created"
attrDateModified = "date_modified"
attrApproxDateCreated = "approx_date_created"
attrApproxDateModified = "approx_date_modified"

// cutoff date for _FIVETRAN_SYNCED usage
fivetranCutoffDate = "2024-03-09T00:00:00Z"

// update expression helpers
setPrefix = "SET "
commaSep = ", "
exprSetDateCreated = "#date_created = :date_created"
exprSetDateModified = "#date_modified = :date_modified"
condAnyMissing = "attribute_not_exists(#date_created) OR #date_created = :empty OR attribute_not_exists(#date_modified) OR #date_modified = :empty"
setPrefix = "SET "
commaSep = ", "
exprSetDateCreated = "#date_created = :date_created"
exprSetDateModified = "#date_modified = :date_modified"
exprSetApproxDateCreated = "#approx_date_created = :approx_date_created"
exprSetApproxDateModified = "#approx_date_modified = :approx_date_modified"
condAnyMissing = "attribute_not_exists(#date_created) OR #date_created = :empty OR attribute_not_exists(#date_modified) OR #date_modified = :empty"

// source labels
labelFromCreated = "from_created"
labelFromModified = "from_modified"
labelFivetranSynced = "fivetran_synced"
labelNow = "now"
labelSignURLCreated = "signurl_createdat"
labelSignURLIssued = "signurl_issuedat"
labelSignedOn = "signed_on"
Expand Down Expand Up @@ -91,6 +94,8 @@ type SignatureRecord struct {
SignatureID string `dynamodbav:"signature_id"`
DateCreated string `dynamodbav:"date_created"`
DateModified string `dynamodbav:"date_modified"`
ApproxDateCreated string `dynamodbav:"approx_date_created"`
ApproxDateModified string `dynamodbav:"approx_date_modified"`
SignedOn string `dynamodbav:"signed_on"`
UserDocusignDateSigned string `dynamodbav:"user_docusign_date_signed"`
UserDocusignRawXML string `dynamodbav:"user_docusign_raw_xml"`
Expand All @@ -104,12 +109,19 @@ type Counter map[string]int
func (c Counter) Inc(label string) { c[label]++ }

type UpdateStats struct {
Created Counter
Modified Counter
Created Counter
Modified Counter
ApproxCreated Counter
ApproxModified Counter
}

func newStats() UpdateStats {
return UpdateStats{Created: Counter{}, Modified: Counter{}}
return UpdateStats{
Created: Counter{},
Modified: Counter{},
ApproxCreated: Counter{},
ApproxModified: Counter{},
}
}

// -----------------------------------------------------------------------------
Expand All @@ -122,7 +134,6 @@ func main() {
stage = "dev"
}
dryRun := getEnvBool("DRY_RUN")
allowCurrentTime := getEnvBool("ALLOW_CURRENT_TIME")
debug = getEnvBool("DEBUG")

// Snowflake helper & table
Expand All @@ -141,7 +152,7 @@ func main() {
if fallbackCLIPath == "" {
fallbackCLIPath = fmt.Sprintf("backfill-fallback-commands-cla-%s-signatures-%s.sh", stage, time.Now().UTC().Format("20060102T150405Z"))
}
fmt.Printf("Signature backfill | stage=%s dry-run=%t allow-current-time(after SF)=%t DEBUG=%t\n", stage, dryRun, allowCurrentTime, debug)
fmt.Printf("Signature backfill | stage=%s dry-run=%t DEBUG=%t\n", stage, dryRun, debug)
fmt.Printf("Snowflake: table=%s via %s (batch=%d)\n", sfTable, sfCmd, sfBatchSize)

awsSession, err := session.NewSession(&aws.Config{Region: aws.String(regionDefault)})
Expand All @@ -168,7 +179,17 @@ func main() {
return
}
cliFile = f
if _, e := fmt.Fprintf(cliFile, "#!/usr/bin/env bash\nset -euo pipefail\n# generated %s UTC, stage=%s, table=%s\n\n", time.Now().UTC().Format(time.RFC3339), stage, tableName); e != nil {
if _, e := fmt.Fprintf(
cliFile,
"# Copyright The Linux Foundation and each contributor to CommunityBridge.\n"+
"# SPDX-License-Identifier: MIT\n"+
"#!/usr/bin/env bash\n"+
"set -euo pipefail\n"+
"# generated %s UTC, stage=%s, table=%s\n\n",
time.Now().UTC().Format(time.RFC3339),
stage,
tableName,
); e != nil {
log.Printf("WARN: writing header to %s: %v", clean, e)
}
cliOpen = true
Expand Down Expand Up @@ -211,21 +232,6 @@ func main() {
)
cliCount += sfCliCount
updated += sfFixed

// 3) Final now() pass (only if allowed)
nowFixed, nowCliCount := finalNowFix(
ddb, tableName, stage, region, dryRun, &stats, pending, allowCurrentTime,
func(cmd string) {
openCLI()
if cliFile != nil {
if _, werr := fmt.Fprintln(cliFile, cmd); werr != nil {
log.Printf("WARN: could not append CLI line: %v", werr)
}
}
},
)
cliCount += nowCliCount
updated += nowFixed
skipped := len(pending)

fmt.Printf("\nCompleted. Updated: %d | Still pending (skipped): %d\n", updated, skipped)
Expand Down Expand Up @@ -599,34 +605,58 @@ func snowflakeFix(
updateExpr := setPrefix
vals := map[string]*dynamodb.AttributeValue{":empty": {S: aws.String("")}}
names := map[string]*string{
"#date_created": aws.String(attrDateCreated),
"#date_modified": aws.String(attrDateModified),
"#date_created": aws.String(attrDateCreated),
"#date_modified": aws.String(attrDateModified),
"#approx_date_created": aws.String(attrApproxDateCreated),
"#approx_date_modified": aws.String(attrApproxDateModified),
}
first := true
if setCreated {
if !first {
updateExpr += commaSep
}
updateExpr += exprSetDateCreated
vals[":date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
// Use approx field if source is Fivetran synced, otherwise use regular field
if srcC == labelFivetranSynced {
updateExpr += exprSetApproxDateCreated
vals[":approx_date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
} else {
updateExpr += exprSetDateCreated
vals[":date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
}
first = false
}
if setModified {
if !first {
updateExpr += commaSep
}
updateExpr += exprSetDateModified
vals[":date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
// Use approx field if source is Fivetran synced, otherwise use regular field
if srcM == labelFivetranSynced {
updateExpr += exprSetApproxDateModified
vals[":approx_date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
} else {
updateExpr += exprSetDateModified
vals[":date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
}
}

// Stats
if setCreated {
stats.Created.Inc(srcC)
stats.Created.Inc("_total")
if srcC == labelFivetranSynced {
stats.ApproxCreated.Inc(srcC)
stats.ApproxCreated.Inc("_total")
} else {
stats.Created.Inc(srcC)
stats.Created.Inc("_total")
}
}
if setModified {
stats.Modified.Inc(srcM)
stats.Modified.Inc("_total")
if srcM == labelFivetranSynced {
stats.ApproxModified.Inc(srcM)
stats.ApproxModified.Inc("_total")
} else {
stats.Modified.Inc(srcM)
stats.Modified.Inc("_total")
}
}

cmd := buildAwsCliUpdate(region, stage, tableName, id, updateExpr, names, vals, condAnyMissing)
Expand Down Expand Up @@ -665,140 +695,6 @@ func snowflakeFix(
return fixed, cliCount
}

// -----------------------------------------------------------------------------
// Final now()-fill pass (only if allowed)
// -----------------------------------------------------------------------------

func finalNowFix(
ddb *dynamodb.DynamoDB,
tableName, stage, region string,
dryRun bool,
stats *UpdateStats,
pending map[string]*pendingInfo,
allowNow bool,
emitCLI func(string),
) (fixed int, cliCount int) {
if !allowNow || len(pending) == 0 {
return 0, 0
}
now := time.Now().UTC().Format(time.RFC3339)

for id, info := range pending {
mC := info.MissingC
mM := info.MissingM
if !mC && !mM {
delete(pending, id)
continue
}

var newC, srcC string
if mC {
if !isMissing(info.Record.DateModified) {
newC, srcC = normalize(info.Record.DateModified), labelFromModified
} else {
newC, srcC = now, labelNow
}
}

var newM, srcM string
if mM {
switch {
case !isMissing(info.Record.DateCreated):
newM, srcM = normalize(info.Record.DateCreated), labelFromCreated
case mC && newC != "":
newM, srcM = newC, labelFromCreated
default:
newM, srcM = now, labelNow
}
}

finalC := ifEmpty(info.Record.DateCreated, newC)
finalM := ifEmpty(info.Record.DateModified, newM)

setCreated := mC && finalC != ""
setModified := mM && finalM != ""
if !setCreated && !setModified {
delete(pending, id)
continue
}

// Monotonic clamp
tc := parseTime(finalC)
tm := parseTime(finalM)
if !tc.IsZero() && !tm.IsZero() && tm.Before(tc) {
finalM = finalC
srcM = labelFromCreated
setModified = mM
}

updateExpr := setPrefix
vals := map[string]*dynamodb.AttributeValue{":empty": {S: aws.String("")}}
names := map[string]*string{
"#date_created": aws.String(attrDateCreated),
"#date_modified": aws.String(attrDateModified),
}
first := true
if setCreated {
if !first {
updateExpr += commaSep
}
updateExpr += exprSetDateCreated
vals[":date_created"] = &dynamodb.AttributeValue{S: aws.String(finalC)}
first = false
}
if setModified {
if !first {
updateExpr += commaSep
}
updateExpr += exprSetDateModified
vals[":date_modified"] = &dynamodb.AttributeValue{S: aws.String(finalM)}
}

// Stats
if setCreated {
stats.Created.Inc(srcC)
stats.Created.Inc("_total")
}
if setModified {
stats.Modified.Inc(srcM)
stats.Modified.Inc("_total")
}

cmd := buildAwsCliUpdate(region, stage, tableName, id, updateExpr, names, vals, condAnyMissing)
dbg(" NOW CLI: %s", cmd)

if dryRun {
if emitCLI != nil {
emitCLI(cmd)
cliCount++
}
fixed++
delete(pending, id)
continue
}

_, uerr := ddb.UpdateItem(&dynamodb.UpdateItemInput{
TableName: aws.String(tableName),
Key: map[string]*dynamodb.AttributeValue{"signature_id": {S: aws.String(id)}},
UpdateExpression: aws.String(updateExpr),
ExpressionAttributeNames: names,
ExpressionAttributeValues: vals,
ConditionExpression: aws.String(condAnyMissing),
})
if uerr != nil {
log.Printf("Update failed (now) %s: %v", id, uerr)
if emitCLI != nil {
emitCLI(cmd)
cliCount++
}
continue
}
fixed++
delete(pending, id)
}
return fixed, cliCount
}

// -----------------------------------------------------------------------------
// Candidate collection & selection
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1181,14 +1077,20 @@ func buildAwsCliUpdate(region, stage, table, sigID, updateExpr string, names map
if av, ok := values[":date_modified"]; ok && av != nil && av.S != nil {
valsFlat[":date_modified"] = map[string]string{"S": *av.S}
}
if av, ok := values[":approx_date_created"]; ok && av != nil && av.S != nil {
valsFlat[":approx_date_created"] = map[string]string{"S": *av.S}
}
if av, ok := values[":approx_date_modified"]; ok && av != nil && av.S != nil {
valsFlat[":approx_date_modified"] = map[string]string{"S": *av.S}
}

kb, kerr := json.Marshal(key)
if kerr != nil {
kb = []byte(fmt.Sprintf(`{"signature_id":{"S":"%s"}}`, sigID))
}
nb, nerr := json.Marshal(namesFlat)
if nerr != nil {
nb = []byte(`{"#date_created":"date_created","#date_modified":"date_modified"}`)
nb = []byte(`{"#date_created":"date_created","#date_modified":"date_modified","#approx_date_created":"approx_date_created","#approx_date_modified":"approx_date_modified"}`)
}
vb, verr := json.Marshal(valsFlat)
if verr != nil {
Expand Down Expand Up @@ -1220,4 +1122,6 @@ func printStats(stats UpdateStats) {
}
print(attrDateCreated, stats.Created)
print(attrDateModified, stats.Modified)
print(attrApproxDateCreated, stats.ApproxCreated)
print(attrApproxDateModified, stats.ApproxModified)
}
2 changes: 1 addition & 1 deletion cla-backend-go/signatures_timestamp_backfill_dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# SPDX-License-Identifier: MIT
#!/bin/bash
# source setenv.sh
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && ALLOW_CURRENT_TIME='' DEBUG=true STAGE=dev DRY_RUN=true ./signatures_timestamp_backfill
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && DEBUG=true STAGE=dev DRY_RUN=true ./signatures_timestamp_backfill
4 changes: 2 additions & 2 deletions cla-backend-go/signatures_timestamp_backfill_prod.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# SPDX-License-Identifier: MIT
#!/bin/bash
# source setenv-prod.sh.secret
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && ALLOW_CURRENT_TIME='' DEBUG='' STAGE=prod DRY_RUN=true ./signatures_timestamp_backfill
# go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && ALLOW_CURRENT_TIME='' DEBUG=true STAGE=prod DRY_RUN='' ./signatures_timestamp_backfill
go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && DEBUG='' STAGE=prod DRY_RUN=true ./signatures_timestamp_backfill
# go build -o signatures_timestamp_backfill cmd/signatures_timestamp_backfill/main.go && DEBUG=true STAGE=prod DRY_RUN='' ./signatures_timestamp_backfill
Loading