Skip to content

Commit 8d1c845

Browse files
Restored & migrated nodes from DB backup, fixed bug w/ equalification
1 parent 3dba2e6 commit 8d1c845

File tree

10 files changed

+201
-93
lines changed

10 files changed

+201
-93
lines changed

src/internal.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { migrateMessagesAndTags, processScans } from '#src/internal/index';
1+
import { fixMessageNodes, fixRows, migrateMessagesAndTags, normalizeNodesForUser, processScans } from '#src/internal/index';
22

33
export const internal = async (event) => {
44
if (event.path.endsWith('/processScans')) {
@@ -7,4 +7,13 @@ export const internal = async (event) => {
77
else if (event.path.endsWith('/migrateMessagesAndTags')) {
88
return migrateMessagesAndTags(event);
99
}
10+
else if (event.path.endsWith('/normalizeNodesForUser')) {
11+
return normalizeNodesForUser(event);
12+
}
13+
else if (event.path.endsWith('/fixRows')) {
14+
return fixRows(event);
15+
}
16+
else if (event.path.endsWith('/fixMessageNodes')) {
17+
return fixMessageNodes(event);
18+
}
1019
}

src/internal/fixMessageNodes.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { chunk, db, dbRestore } from '#src/utils';
2+
3+
export const fixMessageNodes = async (event) => {
4+
const { userId } = event;
5+
6+
await dbRestore.connect();
7+
const oldRows = (await dbRestore.query({
8+
text: `SELECT "id", "message_id", "enode_id" FROM "message_nodes" WHERE "user_id"=$1 ORDER BY "id" DESC`,
9+
values: [userId],
10+
})).rows;
11+
await dbRestore.clean();
12+
13+
await db.connect();
14+
const currentNodeIds = (await db.query({
15+
text: `SELECT "id" FROM "enodes" WHERE "user_id"=$1 ORDER BY "created_at" DESC`,
16+
values: [userId],
17+
})).rows.map(obj => obj.id);
18+
19+
const filteredRows = oldRows.filter(obj => currentNodeIds.includes(obj.enode_id))
20+
21+
await Promise.all(chunk(filteredRows, 1000).map(async (chunkOfRows) => {
22+
await db.query(`INSERT INTO "message_nodes" ("id", "message_id", "enode_id", "user_id") VALUES ${chunkOfRows.map(row => `(
23+
'${row.id}',
24+
'${row.message_id}',
25+
'${row.enode_id}',
26+
'${userId}'
27+
)`).join()} ON CONFLICT DO NOTHING`);
28+
}));
29+
await db.clean();
30+
31+
return;
32+
}

src/internal/fixRows.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { chunk, db, dbRestore } from '#src/utils';
2+
3+
export const fixRows = async (event) => {
4+
const { userId } = event;
5+
6+
await dbRestore.connect();
7+
const oldRows = (await dbRestore.query({
8+
text: `SELECT "id", "created_at", "url_id", "html", "targets" FROM "enodes" WHERE "user_id"=$1 ORDER BY "created_at" DESC`,
9+
values: [userId],
10+
})).rows;
11+
await dbRestore.clean();
12+
13+
await db.connect();
14+
await Promise.all(chunk(oldRows, 1000).map(async (chunkOfRows) => {
15+
await db.query(`INSERT INTO "enodes" ("id", "created_at", "updated_at", "user_id", "url_id", "equalified", "html", "targets") VALUES ${chunkOfRows.map(row => `(
16+
'${row.id}',
17+
'${row.created_at.toISOString()}',
18+
'${row.created_at.toISOString()}',
19+
'${userId}',
20+
'${row.url_id}',
21+
false,
22+
'${escapeSqlString(row.html)}',
23+
'${escapeSqlString(JSON.stringify(row.targets))}'
24+
)`).join()} ON CONFLICT DO NOTHING`);
25+
}));
26+
await db.clean();
27+
28+
return;
29+
}
30+
31+
const escapeSqlString = (value) => {
32+
if (value === null || typeof value === 'undefined') {
33+
return 'NULL';
34+
}
35+
return String(value).replace(/'/g, "''");
36+
}

src/internal/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
export * from './processScans'
2-
export * from './migrateMessagesAndTags'
2+
export * from './migrateMessagesAndTags'
3+
export * from './normalizeNodesForUser'
4+
export * from './fixRows'
5+
export * from './fixMessageNodes'
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { db, hashStringToUuid, normalizeHtmlWithVdom } from '#src/utils';
2+
3+
export const normalizeNodesForUser = async (event) => {
4+
await db.connect();
5+
6+
const rows = (await db.query({
7+
text: `SELECT "id", "created_at", "html" FROM "enodes" WHERE "user_id"=$1 ORDER BY "created_at" DESC`, // AND "html_hash_id" IS NULL
8+
values: [event.userId],
9+
})).rows;
10+
console.log(JSON.stringify({ rowsLength: rows.length }));
11+
for (const row of rows) {
12+
// Remove all existing node updates
13+
await db.query({
14+
text: `DELETE FROM "enode_updates" WHERE "enode_id"=$1`,
15+
values: [row.id],
16+
});
17+
18+
// Normalize html / hash / store
19+
const normalizedHtml = normalizeHtmlWithVdom(row.html);
20+
const htmlHashId = hashStringToUuid(normalizedHtml);
21+
await db.query({
22+
text: `UPDATE "enodes" SET "html_normalized"=$1, "html_hash_id"=$2 WHERE "id"=$3`,
23+
values: [normalizedHtml, htmlHashId, row.id],
24+
});
25+
26+
// Now that we've normalized nodes, let's insert the correct node updates
27+
await db.query({
28+
text: `INSERT INTO "enode_updates" ("user_id", "created_at", "updated_at", "enode_id", "equalified") VALUES ($1, $2, $3, $4, $5)`,
29+
values: [event.userId, '2024-10-07', '2024-10-07', row.id, false],
30+
});
31+
}
32+
await db.clean();
33+
return;
34+
}

src/internal/processScans.ts

Lines changed: 16 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import { chunk, db, isStaging, sleep, hashStringToUuid } from '#src/utils';
2-
import * as cheerio from 'cheerio';
1+
import { chunk, db, isStaging, sleep, hashStringToUuid, normalizeHtmlWithVdom } from '#src/utils';
32

43
export const processScans = async (event) => {
5-
console.log(`START PROCESS SCANS`);
4+
console.log(`Start processScans`);
65
const startTime = new Date().getTime();
76
await db.connect();
87
const { userId, propertyId, discovery } = event;
@@ -16,13 +15,13 @@ export const processScans = async (event) => {
1615
})).rows.map(obj => obj.job_id);
1716
const allNodeIds = [];
1817
const failedNodeIds = [];
19-
const pollScans = (givenJobIds) => new Promise(async (finalRes) => {
18+
const pollScans = (givenJobIds) => new Promise(async (outerRes) => {
2019
await sleep(1000);
2120
const remainingScans = [];
2221
const batchesOfJobIds = chunk(givenJobIds, 25);
2322
for (const [index, batchOfJobIds] of batchesOfJobIds.entries()) {
24-
console.log(`Start ${index} of ${batchesOfJobIds.length} batches`);
25-
await Promise.allSettled(batchOfJobIds.map(jobId => new Promise(async (res) => {
23+
console.log(`Start ${index + 1} of ${batchesOfJobIds.length} batches`);
24+
await Promise.allSettled(batchOfJobIds.map(jobId => new Promise(async (innerRes) => {
2625
try {
2726
const scanResults = await fetch(`https://scan${isStaging ? '-dev' : ''}.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(10000) });
2827
const { result, status } = await scanResults.json();
@@ -57,8 +56,9 @@ export const processScans = async (event) => {
5756
console.log(err);
5857
remainingScans.push(jobId);
5958
}
60-
res(1);
59+
innerRes(1);
6160
})));
61+
console.log(`End ${index + 1} of ${batchesOfJobIds.length} batches`);
6262
}
6363
const stats = { userId, remainingScans: remainingScans.length };
6464
console.log(JSON.stringify(stats));
@@ -68,6 +68,7 @@ export const processScans = async (event) => {
6868
const tenMinutes = 10 * 60 * 1000;
6969
if (deltaTime <= tenMinutes) {
7070
await pollScans(remainingScans);
71+
outerRes(1);
7172
}
7273
else if (deltaTime > tenMinutes) {
7374
const scansExist = (await db.query({
@@ -81,12 +82,13 @@ export const processScans = async (event) => {
8182
}
8283
}
8384
}
84-
else {
85-
finalRes(1);
86-
}
85+
outerRes(1);
8786
});
87+
console.log(`Start pollScans`);
8888
await pollScans(jobIds);
89+
console.log(`End pollScans`);
8990

91+
console.log(`Start equalification`);
9092
// At the end of all scans, reconcile equalified nodes
9193
// Set node equalified to true for previous nodes associated w/ this scan (EXCEPT failed ones)
9294
const allPropertyUrlIds = (await db.query({
@@ -105,7 +107,6 @@ export const processScans = async (event) => {
105107
text: `SELECT "id" FROM "enode_updates" WHERE "user_id"=$1 AND "enode_id"=$2 AND "created_at"::text LIKE $3`,
106108
values: [userId, equalifiedNodeId, `${new Date().toISOString().split('T')[0]}%`],
107109
})).rows[0]?.id;
108-
console.log(JSON.stringify({ equalifiedNodeId }));
109110
if (existingNodeUpdateId) {
110111
// We found an existing node update for today, let's simply update it
111112
await db.query({
@@ -119,14 +120,12 @@ export const processScans = async (event) => {
119120
text: `INSERT INTO "enode_updates" ("user_id", "enode_id", "equalified") VALUES ($1, $2, $3)`,
120121
values: [userId, equalifiedNodeId, true],
121122
});
122-
console.log(JSON.stringify({ message: 'Inserted equalified node update' }));
123123
}
124124
// Now that we've inserted an "equalified" node update, let's set the parent node to "equalified" too!
125125
await db.query({
126126
text: `UPDATE "enodes" SET "equalified"=$1 WHERE "id"=$2`,
127127
values: [true, equalifiedNodeId],
128128
});
129-
console.log(JSON.stringify({ message: 'Update equalified node' }));
130129
}
131130

132131
// For our failed nodes, we need to "copy" the last node update that exists (if there even is one!)
@@ -135,7 +134,6 @@ export const processScans = async (event) => {
135134
text: `SELECT "id" FROM "enode_updates" WHERE "user_id"=$1 AND "enode_id"=$2 AND "created_at"::text LIKE $3`,
136135
values: [userId, failedNodeId, `${new Date().toISOString().split('T')[0]}%`],
137136
})).rows[0]?.id;
138-
console.log(JSON.stringify({ existingNodeUpdateId }))
139137
if (existingNodeUpdateId) {
140138
await db.query({
141139
text: `UPDATE "enode_updates" SET "equalified"=$1 WHERE "id"=$2`,
@@ -148,18 +146,18 @@ export const processScans = async (event) => {
148146
text: `INSERT INTO "enode_updates" ("user_id", "enode_id", "equalified") VALUES ($1, $2, $3)`,
149147
values: [userId, failedNodeId, false],
150148
});
151-
console.log(JSON.stringify({ message: 'no node update found!' }))
152149
}
153150
// Now that we've inserted an "unequalified" node update, let's set the parent node to "unequalified" too!
154151
await db.query({
155152
text: `UPDATE "enodes" SET "equalified"=$1 WHERE "id"=$2`,
156153
values: [false, failedNodeId],
157154
});
158-
console.log(JSON.stringify({ message: 'updating parent node!' }))
159155
}
160156

157+
console.log(`End equalification`);
158+
console.log(`End processScans`);
159+
161160
await db.clean();
162-
console.log(`END PROCESS SCANS`);
163161
return;
164162
}
165163

@@ -282,75 +280,4 @@ const scanProcessor = async ({ result, jobId, userId, propertyId }) => {
282280
});
283281

284282
return result.nodes.map(obj => obj.id);
285-
}
286-
287-
const normalizeHtmlWithRegex = (html) => {
288-
if (!html) return html;
289-
290-
// Remove query parameters from src and href attributes
291-
html = html.replace(/(src|href)=(["'])([^?"']*)\?[^"']*?\2/g, '$1=$2$3$2');
292-
293-
// Normalize data-version paths with version hashes
294-
html = html.replace(/data-version=(["'])(\/s\/player\/)[a-zA-Z0-9]{8,}(\/.*?\2)/g, 'data-version=$1$2NORMALIZED$3');
295-
296-
// Normalize tabindex attributes
297-
html = html.replace(/tabindex=["']-?\d+["']/g, 'tabindex="NORMALIZED"');
298-
299-
return html;
300-
};
301-
302-
const normalizeHtmlWithVdom = (html) => {
303-
if (!html) return '';
304-
305-
const $ = cheerio.load(`<div id="wrapper">${html}</div>`);
306-
const root = $('#wrapper');
307-
308-
// Process all elements
309-
root.find('*').each(function () {
310-
const el = $(this);
311-
312-
// Normalize IDs with numbers
313-
if (el.attr('id') && /\d{4,}/.test(el.attr('id'))) {
314-
el.attr('id', 'NORMALIZED');
315-
}
316-
317-
// Always normalize tabindex
318-
if (el.attr('tabindex')) {
319-
el.attr('tabindex', 'NORMALIZED');
320-
}
321-
322-
// Remove query params from URLs
323-
['src', 'href'].forEach(attr => {
324-
if (el.attr(attr) && el.attr(attr).includes('?')) {
325-
el.attr(attr, el.attr(attr).split('?')[0]);
326-
}
327-
});
328-
329-
// Handle h5p quiz elements
330-
if (el.hasClass('h5p-sc-alternative')) {
331-
if (el.hasClass('h5p-sc-is-correct') || el.hasClass('h5p-sc-is-wrong')) {
332-
el.removeClass('h5p-sc-is-correct h5p-sc-is-wrong')
333-
.addClass('h5p-sc-is-NORMALIZED');
334-
}
335-
}
336-
337-
// Normalize data-version attributes
338-
if (el.attr('data-version') && el.attr('data-version').includes('/s/player/')) {
339-
el.attr('data-version', el.attr('data-version')
340-
.replace(/\/s\/player\/[a-zA-Z0-9]{8,}\//, '/s/player/NORMALIZED/'));
341-
}
342-
343-
// Add more element-specific normalizations based on your data patterns
344-
});
345-
346-
// Remove all whitespace between tags for more reliable comparison
347-
let result = root.html();
348-
349-
// Remove excess whitespace for more consistent matching
350-
result = result.replace(/>\s+</g, '><');
351-
352-
// Remove all text node whitespace variations
353-
result = result.replace(/\s{2,}/g, ' ');
354-
355-
return result;
356-
}
283+
}

src/routes/getResultsAll.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ export const getResultsAll = async ({ request, reply }) => {
174174
};
175175
const compressedBody = gzipSync(JSON.stringify(body));
176176
const cacheExpiry = new Date();
177-
cacheExpiry.setMinutes(cacheExpiry.getMinutes() + 5);
177+
cacheExpiry.setMinutes(cacheExpiry.getMinutes() + 1);
178178
await db.query({
179179
text: `UPDATE "reports" SET "cache_gzip"=$1, "cache_date"=$2 WHERE "id"=$3`,
180180
values: [compressedBody, cacheExpiry, request.query.reportId],

src/utils/db.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,13 @@ export const db = new ServerlessClient({
77
password: process.env.DB_PASSWORD,
88
port: 5432,
99
ssl: { rejectUnauthorized: false },
10+
});
11+
12+
export const dbRestore = new ServerlessClient({
13+
user: process.env.DB_USER,
14+
host: process.env.DB_HOST_RESTORE,
15+
database: process.env.DB_NAME,
16+
password: process.env.DB_PASSWORD,
17+
port: 5432,
18+
ssl: { rejectUnauthorized: false },
1019
});

src/utils/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ export * from './isStaging'
1010
export * from './graphql'
1111
export * from './sleep'
1212
export * from './chunk'
13-
export * from './hashStringToUuid'
13+
export * from './hashStringToUuid'
14+
export * from './normalizeHtmlWithVdom'

0 commit comments

Comments
 (0)