Skip to content

Commit f8a8bd8

Browse files
authored
Add delete route and required code to clean up a vectorize job. (#259)
1 parent adf057e commit f8a8bd8

File tree

7 files changed

+785
-3
lines changed

7 files changed

+785
-3
lines changed

core/src/init.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,86 @@ pub async fn scan_job(pool: &PgPool, job_request: &VectorizeJob) -> Result<(), V
314314
Ok(())
315315
}
316316

317+
pub async fn cleanup_job(pool: &PgPool, job_name: &str) -> Result<(), VectorizeError> {
318+
// First, fetch the job details to get src_schema and src_table
319+
let job = crate::db::get_vectorize_job(pool, job_name)
320+
.await
321+
.map_err(|e| match e {
322+
VectorizeError::SqlError(sqlx::Error::RowNotFound) => {
323+
VectorizeError::NotFound(format!("Job '{}' not found", job_name))
324+
}
325+
_ => e,
326+
})?;
327+
328+
log::info!("Cleaning up job: {}", job_name);
329+
330+
// Delete pending PGMQ messages for this job
331+
// We search for messages where the job_name matches
332+
let delete_messages_query =
333+
format!("DELETE FROM pgmq.vectorize_jobs WHERE message->>'job_name' = $1");
334+
match sqlx::query(&delete_messages_query)
335+
.bind(job_name)
336+
.execute(pool)
337+
.await
338+
{
339+
Ok(result) => {
340+
log::info!(
341+
"Deleted {} pending PGMQ messages for job: {}",
342+
result.rows_affected(),
343+
job_name
344+
);
345+
}
346+
Err(e) => {
347+
log::warn!("Failed to delete PGMQ messages for job {}: {}", job_name, e);
348+
// Continue with cleanup even if PGMQ deletion fails
349+
}
350+
}
351+
352+
// Begin transaction for database resource cleanup
353+
let mut tx = pool.begin().await?;
354+
355+
// Generate cleanup SQL statements
356+
let cleanup_statements = vec![
357+
// Drop triggers first (they depend on the function and table)
358+
query::drop_event_trigger(job_name, &job.src_schema, &job.src_table, "INSERT"),
359+
query::drop_event_trigger(job_name, &job.src_schema, &job.src_table, "UPDATE"),
360+
query::drop_search_tokens_trigger(job_name, &job.src_schema, &job.src_table),
361+
// Drop trigger handler function
362+
query::drop_trigger_handler(job_name),
363+
// Drop view (depends on tables)
364+
query::drop_project_view(job_name),
365+
// Drop tables (CASCADE will handle indexes)
366+
query::drop_embeddings_table(job_name),
367+
query::drop_search_tokens_table(job_name),
368+
// Delete job record
369+
query::delete_job_record(job_name),
370+
];
371+
372+
// Execute cleanup statements
373+
for (idx, statement) in cleanup_statements.iter().enumerate() {
374+
match sqlx::query(statement).execute(&mut *tx).await {
375+
Ok(_) => {
376+
log::debug!("Executed cleanup statement {}: {}", idx + 1, statement);
377+
}
378+
Err(e) => {
379+
log::warn!(
380+
"Warning: cleanup statement {} failed (continuing): {} - Error: {}",
381+
idx + 1,
382+
statement,
383+
e
384+
);
385+
// Continue with other cleanup steps even if one fails
386+
}
387+
}
388+
}
389+
390+
// Commit transaction
391+
tx.commit().await?;
392+
393+
log::info!("Successfully cleaned up job: {}", job_name);
394+
Ok(())
395+
}
396+
317397
#[cfg(test)]
318398
mod tests {
319399
use super::*;

core/src/query.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,38 @@ pub fn drop_project_view(job_name: &str) -> String {
412412
format!("DROP VIEW IF EXISTS vectorize.{job_name}_view;")
413413
}
414414

415+
pub fn drop_embeddings_table(job_name: &str) -> String {
416+
format!("DROP TABLE IF EXISTS vectorize._embeddings_{job_name} CASCADE;")
417+
}
418+
419+
pub fn drop_search_tokens_table(job_name: &str) -> String {
420+
format!("DROP TABLE IF EXISTS vectorize._search_tokens_{job_name} CASCADE;")
421+
}
422+
423+
pub fn drop_trigger_handler(job_name: &str) -> String {
424+
format!("DROP FUNCTION IF EXISTS {TRIGGER_FN_PREFIX}{job_name}() CASCADE;")
425+
}
426+
427+
pub fn drop_event_trigger(
428+
job_name: &str,
429+
src_schema: &str,
430+
src_table: &str,
431+
event: &str,
432+
) -> String {
433+
format!(
434+
"DROP TRIGGER IF EXISTS vectorize_{event_name}_trigger_{job_name} ON {src_schema}.{src_table};",
435+
event_name = event.to_lowercase()
436+
)
437+
}
438+
439+
pub fn drop_search_tokens_trigger(job_name: &str, src_schema: &str, src_table: &str) -> String {
440+
format!("DROP TRIGGER IF EXISTS {job_name}_search_tokens_trigger ON {src_schema}.{src_table};")
441+
}
442+
443+
pub fn delete_job_record(job_name: &str) -> String {
444+
format!("DELETE FROM vectorize.job WHERE job_name = '{job_name}';")
445+
}
446+
415447
/// creates a function that can be called by trigger
416448
pub fn create_trigger_handler(job_name: &str, pkey: &str) -> String {
417449
format!(
@@ -1406,4 +1438,97 @@ EXECUTE FUNCTION vectorize.handle_update_another_job();"
14061438
assert_eq!(filter.operator, FilterOperator::Equal);
14071439
}
14081440
}
1441+
1442+
#[test]
1443+
fn test_drop_embeddings_table() {
1444+
let job_name = "test_job";
1445+
let result = drop_embeddings_table(job_name);
1446+
assert_eq!(
1447+
result,
1448+
"DROP TABLE IF EXISTS vectorize._embeddings_test_job CASCADE;"
1449+
);
1450+
}
1451+
1452+
#[test]
1453+
fn test_drop_search_tokens_table() {
1454+
let job_name = "test_job";
1455+
let result = drop_search_tokens_table(job_name);
1456+
assert_eq!(
1457+
result,
1458+
"DROP TABLE IF EXISTS vectorize._search_tokens_test_job CASCADE;"
1459+
);
1460+
}
1461+
1462+
#[test]
1463+
fn test_drop_trigger_handler() {
1464+
let job_name = "test_job";
1465+
let result = drop_trigger_handler(job_name);
1466+
assert!(result.contains("DROP FUNCTION IF EXISTS"));
1467+
assert!(result.contains("vectorize.handle_update_test_job()"));
1468+
assert!(result.contains("CASCADE"));
1469+
}
1470+
1471+
#[test]
1472+
fn test_drop_event_trigger() {
1473+
let job_name = "test_job";
1474+
let src_schema = "public";
1475+
let src_table = "my_table";
1476+
1477+
let insert_trigger = drop_event_trigger(job_name, src_schema, src_table, "INSERT");
1478+
assert_eq!(
1479+
insert_trigger,
1480+
"DROP TRIGGER IF EXISTS vectorize_insert_trigger_test_job ON public.my_table;"
1481+
);
1482+
1483+
let update_trigger = drop_event_trigger(job_name, src_schema, src_table, "UPDATE");
1484+
assert_eq!(
1485+
update_trigger,
1486+
"DROP TRIGGER IF EXISTS vectorize_update_trigger_test_job ON public.my_table;"
1487+
);
1488+
}
1489+
1490+
#[test]
1491+
fn test_drop_search_tokens_trigger() {
1492+
let job_name = "test_job";
1493+
let src_schema = "public";
1494+
let src_table = "my_table";
1495+
1496+
let result = drop_search_tokens_trigger(job_name, src_schema, src_table);
1497+
assert_eq!(
1498+
result,
1499+
"DROP TRIGGER IF EXISTS test_job_search_tokens_trigger ON public.my_table;"
1500+
);
1501+
}
1502+
1503+
#[test]
1504+
fn test_delete_job_record() {
1505+
let job_name = "test_job";
1506+
let result = delete_job_record(job_name);
1507+
assert_eq!(
1508+
result,
1509+
"DELETE FROM vectorize.job WHERE job_name = 'test_job';"
1510+
);
1511+
}
1512+
1513+
#[test]
1514+
fn test_drop_project_view() {
1515+
let job_name = "test_job";
1516+
let result = drop_project_view(job_name);
1517+
assert_eq!(result, "DROP VIEW IF EXISTS vectorize.test_job_view;");
1518+
}
1519+
1520+
#[test]
1521+
fn test_cleanup_sql_with_special_chars() {
1522+
// Test that job names with underscores work correctly
1523+
let job_name = "my_test_job_123";
1524+
1525+
let embeddings = drop_embeddings_table(job_name);
1526+
assert!(embeddings.contains("_embeddings_my_test_job_123"));
1527+
1528+
let tokens = drop_search_tokens_table(job_name);
1529+
assert!(tokens.contains("_search_tokens_my_test_job_123"));
1530+
1531+
let view = drop_project_view(job_name);
1532+
assert!(view.contains("my_test_job_123_view"));
1533+
}
14091534
}

extension/src/workers/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,19 @@ pub async fn get_vectorize_meta(
8989

9090
/// processes a single job from the queue
9191
pub async fn execute_job(dbclient: &Pool<Postgres>, msg: Message<JobMessage>) -> Result<()> {
92-
let job_meta = get_vectorize_meta(&msg.message.job_name, dbclient).await?;
92+
// Check if the job still exists - it may have been deleted
93+
let job_meta = match get_vectorize_meta(&msg.message.job_name, dbclient).await {
94+
Ok(meta) => meta,
95+
Err(DatabaseError::Db(sqlx::Error::RowNotFound)) => {
96+
warning!(
97+
"pg-vectorize: Job '{}' not found - it may have been deleted. Skipping message.",
98+
msg.message.job_name
99+
);
100+
// Return Ok to allow the message to be deleted from queue
101+
return Ok(());
102+
}
103+
Err(e) => return Err(anyhow::anyhow!("Failed to get job meta: {}", e)),
104+
};
93105
let mut job_params: JobParams = serde_json::from_value(job_meta.params.clone())?;
94106
let bpe = cl100k_base().unwrap();
95107

server/src/routes/table.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::app_state::AppState;
22
use crate::errors::ServerError;
3-
use actix_web::{HttpResponse, post, web};
3+
use actix_web::{HttpResponse, delete, post, web};
44
use serde::{Deserialize, Serialize};
55
use utoipa::ToSchema;
66
use uuid::Uuid;
@@ -59,3 +59,51 @@ pub async fn table(
5959
let resp = JobResponse { id: job_id };
6060
Ok(HttpResponse::Ok().json(resp))
6161
}
62+
63+
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
64+
pub struct DeleteJobResponse {
65+
pub job_name: String,
66+
pub message: String,
67+
}
68+
69+
#[utoipa::path(
70+
context_path = "/api/v1",
71+
responses(
72+
(
73+
status = 200, description = "Successfully deleted vectorize job",
74+
body = DeleteJobResponse,
75+
),
76+
(
77+
status = 404, description = "Job not found",
78+
),
79+
),
80+
)]
81+
#[delete("/table/{job_name}")]
82+
pub async fn delete_table(
83+
app_state: web::Data<AppState>,
84+
job_name: web::Path<String>,
85+
) -> Result<HttpResponse, ServerError> {
86+
let job_name = job_name.into_inner();
87+
88+
// Cleanup the job resources
89+
init::cleanup_job(&app_state.db_pool, &job_name)
90+
.await
91+
.map_err(|e| match e {
92+
vectorize_core::errors::VectorizeError::NotFound(msg) => {
93+
ServerError::NotFoundError(msg)
94+
}
95+
_ => ServerError::from(e),
96+
})?;
97+
98+
// Remove from cache
99+
{
100+
let mut job_cache = app_state.job_cache.write().await;
101+
job_cache.remove(&job_name);
102+
}
103+
104+
let resp = DeleteJobResponse {
105+
job_name: job_name.clone(),
106+
message: format!("Successfully deleted job '{}'", job_name),
107+
};
108+
Ok(HttpResponse::Ok().json(resp))
109+
}

server/src/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub fn route_config(configuration: &mut web::ServiceConfig) {
66
configuration.service(
77
web::scope("/api/v1")
88
.service(routes::table::table)
9+
.service(routes::table::delete_table)
910
.service(routes::search::search),
1011
);
1112
}

0 commit comments

Comments
 (0)