Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codegenerator/cli/npm/envio/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
},
"homepage": "https://envio.dev",
"dependencies": {
"@clickhouse/client": "1.12.1",
"@envio-dev/hypersync-client": "0.6.6",
"@envio-dev/hyperfuel-client": "1.2.2",
"rescript": "11.1.3",
Expand Down
1 change: 1 addition & 0 deletions codegenerator/cli/npm/envio/package.json.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"envio-darwin-arm64": "${version}"
},
"dependencies": {
"@clickhouse/client": "1.12.1",
"@envio-dev/hypersync-client": "0.6.6",
"@envio-dev/hyperfuel-client": "1.2.2",
"rescript": "11.1.3",
Expand Down
16 changes: 16 additions & 0 deletions codegenerator/cli/npm/envio/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions codegenerator/cli/npm/envio/src/Internal.res
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ let makeCacheTable = (~effectName) => {
Table.mkTable(
cacheTablePrefix ++ effectName,
~fields=[
Table.mkField("id", Text, ~fieldSchema=S.string, ~isPrimaryKey=true),
Table.mkField("output", JsonB, ~fieldSchema=cacheOutputSchema, ~isNullable=true),
Table.mkField("id", String, ~fieldSchema=S.string, ~isPrimaryKey=true),
Table.mkField("output", Json, ~fieldSchema=cacheOutputSchema, ~isNullable=true),
],
)
}
Expand Down
29 changes: 29 additions & 0 deletions codegenerator/cli/npm/envio/src/Mirror.res
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
type t = {
initialize: (
~chainConfigs: array<Config.chain>=?,
~entities: array<Internal.entityConfig>=?,
~enums: array<Internal.enumConfig<Internal.enum>>=?,
) => promise<unit>,
setOrThrow: 'item. (
~items: array<'item>,
~table: Table.table,
~itemSchema: S.t<'item>,
) => promise<unit>,
}

let makeClickHouse = (~host, ~database, ~username, ~password): t => {
let client = ClickHouse.createClient({
url: host,
username,
password,
})

{
initialize: (~chainConfigs as _=[], ~entities=[], ~enums=[]) => {
ClickHouse.initialize(client, ~database, ~entities, ~enums)
},
setOrThrow: (~items, ~table, ~itemSchema) => {
ClickHouse.setOrThrow(client, ~items, ~table, ~itemSchema, ~database)
},
}
}
38 changes: 32 additions & 6 deletions codegenerator/cli/npm/envio/src/PgStorage.res
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ let makeCreateTableQuery = (table: Table.table, ~pgSchema, ~isNumericArrayAsText
let fieldName = field->Table.getDbFieldName

{
`"${fieldName}" ${switch fieldType {
| Custom(name) if !(name->Js.String2.startsWith("NUMERIC(")) => `"${pgSchema}".${name}`
// Workaround for Hasura bug https://github.com/enviodev/hyperindex/issues/788
| Numeric if isArray && isNumericArrayAsText => (Table.Text :> string)
| _ => (fieldType :> string)
}}${isArray ? "[]" : ""}${switch defaultValue {
`"${fieldName}" ${Table.getPgFieldType(
~fieldType,
~pgSchema,
~isArray,
~isNullable,
~isNumericArrayAsText,
)}${switch defaultValue {
| Some(defaultValue) => ` DEFAULT ${defaultValue}`
| None => isNullable ? `` : ` NOT NULL`
}}`
Expand Down Expand Up @@ -561,6 +562,7 @@ let rec writeBatch = async (
~allEntities,
~setEffectCacheOrThrow,
~batchCache,
~mirror: option<Mirror.t>=?,
~escapeTables=?,
) => {
try {
Expand Down Expand Up @@ -685,6 +687,21 @@ let rec writeBatch = async (
),
)
->ignore

// Mirror to ClickHouse if configured
switch mirror {
| Some(mirror) =>
promises
->Js.Array2.push(
mirror.setOrThrow(
~items=batchSetUpdates,
~itemSchema=entityConfig.entityHistory.setUpdateSchema,
~table=entityConfig.entityHistory.table,
),
)
->ignore
| None => ()
}
}
}

Expand Down Expand Up @@ -863,6 +880,7 @@ let rec writeBatch = async (
~setEffectCacheOrThrow,
~batchCache,
~allEntities,
~mirror?,
)
}
}
Expand All @@ -876,6 +894,7 @@ let make = (
~pgDatabase,
~pgPassword,
~isHasuraEnabled,
~mirror: option<Mirror.t>=?,
~onInitialize=?,
~onNewTables=?,
): Persistence.storage => {
Expand Down Expand Up @@ -1007,6 +1026,12 @@ let make = (
)
}

// Call mirror.initialize before executing PG queries
switch mirror {
| Some(mirror) => await mirror.initialize(~chainConfigs, ~entities, ~enums)
| None => ()
}

let queries = makeInitializeTransaction(
~pgSchema,
~pgUser,
Expand Down Expand Up @@ -1367,6 +1392,7 @@ let make = (
~allEntities,
~setEffectCacheOrThrow,
~batchCache,
~mirror?,
)
}

Expand Down
20 changes: 20 additions & 0 deletions codegenerator/cli/npm/envio/src/Prometheus.res
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ let executeBatchDurationCounter = PromClient.Counter.makeCounter({
"labelNames": [],
})

let storageWriteTimeCounter = PromClient.Counter.makeCounter({
"name": "envio_storage_write_time",
"help": "Cumulative time spent writing batches to storage in milliseconds",
"labelNames": [],
})

let storageWriteCounter = PromClient.Counter.makeCounter({
"name": "envio_storage_write_count",
"help": "Total number of batch writes to storage",
"labelNames": [],
})

let allChainsSyncedToHead = PromClient.Gauge.makeGauge({
"name": "hyperindex_synced_to_head",
"help": "All chains fully synced",
Expand Down Expand Up @@ -213,6 +225,14 @@ let incrementExecuteBatchDurationCounter = (~duration) => {
executeBatchDurationCounter->PromClient.Counter.incMany(duration)
}

let incrementStorageWriteTimeCounter = (~duration) => {
storageWriteTimeCounter->PromClient.Counter.incMany(duration)
}

let incrementStorageWriteCounter = () => {
storageWriteCounter->PromClient.Counter.inc
}

let setSourceChainHeight = (~blockNumber, ~chainId) => {
sourceChainHeight
->PromClient.Gauge.labels({"chainId": chainId})
Expand Down
169 changes: 169 additions & 0 deletions codegenerator/cli/npm/envio/src/bindings/ClickHouse.res
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// ClickHouse client bindings for @clickhouse/client

type client

type clientConfig = {
url?: string,
database?: string,
username?: string,
password?: string,
}

type execParams = {query: string}

@module("@clickhouse/client")
external createClient: clientConfig => client = "createClient"

@send
external exec: (client, execParams) => promise<unit> = "exec"

@send
external close: client => promise<unit> = "close"

type insertParams<'a> = {
table: string,
values: array<'a>,
format: string,
}

@send
external insert: (client, insertParams<'a>) => promise<unit> = "insert"

let getClickHouseFieldType = (
~fieldType: Table.fieldType,
~isNullable: bool,
~isArray: bool,
): string => {
let baseType = switch fieldType {
| Int32 => "Int32"
| Uint32 => "UInt32"
| Serial => "Int32"
| BigInt({?precision}) =>
switch precision {
| None => "String" // Fallback for unbounded BigInt
| Some(precision) =>
if precision > 38 {
"String"
} else {
`Decimal(${precision->Js.Int.toString},0)`
}
Comment on lines +50 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add non-negative validation for BigInt precision.

The precision validation checks the upper bound (38) but doesn't verify that precision is non-negative. A negative precision would bypass validation and produce an invalid Decimal type.

Apply this diff to add validation:

  | BigInt({?precision}) =>
    switch precision {
    | None => "String" // Fallback for unbounded BigInt
    | Some(precision) =>
-     if precision > 38 {
+     if precision < 0 || precision > 38 {
        "String"
      } else {
        `Decimal(${precision->Js.Int.toString},0)`
      }
    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| BigInt({?precision}) =>
switch precision {
| None => "String" // Fallback for unbounded BigInt
| Some(precision) =>
if precision > 38 {
"String"
} else {
`Decimal(${precision->Js.Int.toString},0)`
}
| BigInt({?precision}) =>
switch precision {
| None => "String" // Fallback for unbounded BigInt
| Some(precision) =>
if precision < 0 || precision > 38 {
"String"
} else {
`Decimal(${precision->Js.Int.toString},0)`
}
🤖 Prompt for AI Agents
In codegenerator/cli/npm/envio/src/bindings/ClickHouse.res around lines 41-49,
the BigInt precision branch validates only the upper bound (38) but not that
precision is non-negative; update the condition so any precision < 0 or > 38
falls back to "String" and only produce `Decimal(precision,0)` when precision is
within 0..38 (use Js.Int.toString as before for the Decimal formatting).

}
| BigDecimal({?config}) =>
switch config {
| None => "String" // Fallback for unbounded BigInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix incorrect comment.

The comment says "Fallback for unbounded BigInt" but this is the BigDecimal branch, not BigInt.

Apply this diff:

-    | None => "String" // Fallback for unbounded BigInt
+    | None => "String" // Fallback for BigDecimal without config
🤖 Prompt for AI Agents
In codegenerator/cli/npm/envio/src/bindings/ClickHouse.res at line 53, the
inline comment for the "None" branch incorrectly references "unbounded BigInt";
update the comment to reference "BigDecimal" (e.g., "Fallback for unbounded
BigDecimal") so it matches the BigDecimal branch semantics and avoids misleading
documentation.

| Some((precision, scale)) =>
if precision > 38 || scale > precision {
"String"
} else {
`Decimal(${precision->Js.Int.toString},${scale->Js.Int.toString})`
}
}
Comment on lines +60 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate that precision and scale are non-negative.

The BigDecimal branch correctly checks ClickHouse's upper limits (precision ≤ 38, scale ≤ precision), but doesn't verify that both values are non-negative. Negative values would bypass validation and produce invalid Decimal types.

Apply this diff to add non-negative validation:

  | BigDecimal({?config}) =>
    switch config {
    | None => "String" // Fallback for unbounded BigInt
    | Some((precision, scale)) =>
-     if precision > 38 || scale > precision {
+     if precision < 0 || scale < 0 || precision > 38 || scale > precision {
        "String"
      } else {
        `Decimal(${precision->Js.Int.toString},${scale->Js.Int.toString})`
      }
    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| BigDecimal({?config}) =>
switch config {
| None => "String" // Fallback for unbounded BigInt
| Some((precision, scale)) =>
if precision > 38 || scale > precision {
"String"
} else {
`Decimal(${precision->Js.Int.toString},${scale->Js.Int.toString})`
}
}
| BigDecimal({?config}) =>
switch config {
| None => "String" // Fallback for unbounded BigInt
| Some((precision, scale)) =>
if precision < 0 || scale < 0 || precision > 38 || scale > precision {
"String"
} else {
`Decimal(${precision->Js.Int.toString},${scale->Js.Int.toString})`
}
}
🤖 Prompt for AI Agents
In codegenerator/cli/npm/envio/src/bindings/ClickHouse.res around lines 51 to
60, the BigDecimal branch validates upper bounds but not that precision and
scale are non-negative; update the switch's Some((precision, scale)) path to
first check precision >= 0 and scale >= 0 and treat any negative values as
invalid (return the "String" fallback), then keep the existing upper-bound
checks (precision <= 38 and scale <= precision) and only emit the Decimal(...)
string when all validations pass.

| Boolean => "Bool"
| Number => "Float64"
| String => "String"
| Json => "String"
| Date => "DateTime64(3, 'UTC')"
| Enum(_) => "String"
| Entity(_) => "String"
}

let baseType = if isArray {
`Array(${baseType})`
} else {
baseType
}

isNullable ? `Nullable(${baseType})` : baseType
Comment on lines +92 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't emit Nullable(Array(...)) columns.

ClickHouse forbids wrapping composite types in Nullable(...). As soon as you mirror a nullable array field this generates Nullable(Array(_)) and the history table DDL fails. Push the nullability into the element type (or drop it entirely) before wrapping with Array.

-  let baseType = if isArray {
-    `Array(${baseType})`
-  } else {
-    baseType
-  }
-
-  isNullable ? `Nullable(${baseType})` : baseType
+  if isArray {
+    let elementType = if isNullable { `Nullable(${baseType})` } else { baseType }
+    `Array(${elementType})`
+  } else {
+    isNullable ? `Nullable(${baseType})` : baseType
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let baseType = if isArray {
`Array(${baseType})`
} else {
baseType
}
isNullable ? `Nullable(${baseType})` : baseType
if isArray {
let elementType = if isNullable { `Nullable(${baseType})` } else { baseType }
`Array(${elementType})`
} else {
isNullable ? `Nullable(${baseType})` : baseType
}
🤖 Prompt for AI Agents
In codegenerator/cli/npm/envio/src/bindings/ClickHouse.res around lines 63-69,
the code currently emits Nullable(Array(...)) when a field is both nullable and
an array; ClickHouse forbids Nullable wrapped around composite types. Change the
construction so nullability is applied to the array element type (or omitted)
before wrapping with Array: compute the element type first as either
Nullable(baseType) or baseType depending on isNullable, then if isArray wrap
that element type with Array(...), otherwise return the element type.

}

let setOrThrow = async (
client,
~items: array<'item>,
~table: Table.table,
~itemSchema: S.t<'item>,
~database: string,
) => {
if items->Array.length === 0 {
()
} else {
try {
// Convert entity updates to ClickHouse row format
let values = items->Js.Array2.map(item => {
item->S.reverseConvertOrThrow(itemSchema)
})

await client->insert({
table: `${database}.\`${table.tableName}\``,
values,
format: "JSONEachRow",
})
} catch {
| exn =>
raise(
Persistence.StorageError({
message: `Failed to insert items into ClickHouse table "${table.tableName}"`,
reason: exn->Utils.prettifyExn,
}),
)
}
}
}

// Generate CREATE TABLE query for entity history table
let makeCreateHistoryTableQuery = (entity: Internal.entityConfig, ~database: string) => {
let historyTable = entity.entityHistory.table

let fieldDefinitions =
historyTable.fields
->Belt.Array.keepMap(field => {
switch field {
| Field(field) =>
Some({
let fieldName = field->Table.getDbFieldName
let clickHouseType = getClickHouseFieldType(
~fieldType=field.fieldType,
~isNullable=field.isNullable,
~isArray=field.isArray,
)
`\`${fieldName}\` ${clickHouseType}`
})
| DerivedFrom(_) => None
}
})
->Js.Array2.joinWith(",\n ")

let tableName = historyTable.tableName

`CREATE TABLE IF NOT EXISTS ${database}.\`${tableName}\` (
${fieldDefinitions}
)
ENGINE = MergeTree()
ORDER BY (id, ${EntityHistory.checkpointIdFieldName})`
}

// Initialize ClickHouse tables for entities
let initialize = async (
client,
~database: string,
~entities: array<Internal.entityConfig>,
~enums as _: array<Internal.enumConfig<Internal.enum>>,
) => {
try {
await client->exec({query: `DROP DATABASE IF EXISTS ${database}`})
await client->exec({query: `CREATE DATABASE ${database}`})
await client->exec({query: `USE ${database}`})

Comment on lines +228 to +232
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Stop dropping the ClickHouse database on init

initialize currently runs DROP DATABASE IF EXISTS ${database} before every bootstrap, erasing all mirrored data on restart. That’s catastrophic in production. Make the schema setup idempotent by only creating the database when absent instead of dropping it.

-    await client->exec({query: `DROP DATABASE IF EXISTS ${database}`})
-    await client->exec({query: `CREATE DATABASE ${database}`})
-    await client->exec({query: `USE ${database}`})
+    await client->exec({query: `CREATE DATABASE IF NOT EXISTS ${database}`})
+    await client->exec({query: `USE ${database}`})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
await client->exec({query: `DROP DATABASE IF EXISTS ${database}`})
await client->exec({query: `CREATE DATABASE ${database}`})
await client->exec({query: `USE ${database}`})
try {
await client->exec({query: `CREATE DATABASE IF NOT EXISTS ${database}`})
await client->exec({query: `USE ${database}`})
🤖 Prompt for AI Agents
In codegenerator/cli/npm/envio/src/bindings/ClickHouse.res around lines 114 to
118, the initializer currently executes "DROP DATABASE IF EXISTS ${database}"
which destroys mirrored data on every bootstrap; instead make schema setup
idempotent by removing the DROP and creating the database only if it doesn't
exist. Replace the DROP + CREATE sequence with a single idempotent operation
(for example use "CREATE DATABASE IF NOT EXISTS ${database}" or first check for
existence and only create when absent), then continue with "USE ${database}" so
rest of initialization runs against the existing or newly-created database.

await Promise.all(
entities->Belt.Array.map(entity =>
client->exec({query: makeCreateHistoryTableQuery(entity, ~database)})
),
)->Promise.ignoreValue

Logging.trace("ClickHouse mirror initialization completed successfully")
} catch {
| exn => {
Logging.errorWithExn(exn, "Failed to initialize ClickHouse mirror")
Js.Exn.raiseError("ClickHouse initialization failed")
}
}
}
Loading
Loading