Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions cmd/msgvault/cmd/embed_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func runEmbed(ctx context.Context) error {
FullRebuild: embedFullRebuild,
Model: cfg.Vector.Embeddings.Model,
Dimension: cfg.Vector.Embeddings.Dimension,
Fingerprint: cfg.Vector.Embeddings.Fingerprint(),
Fingerprint: cfg.Vector.GenerationFingerprint(),
Confirm: func() bool {
return embedYes ||
confirmEmbed("Start a full rebuild? This builds a new generation and atomically swaps it in when complete. ")
Expand Down Expand Up @@ -80,8 +80,12 @@ func runEmbed(ctx context.Context) error {
MainDB: s.DB(),
Client: client,
Preprocess: embed.PreprocessConfig{
StripQuotes: cfg.Vector.Preprocess.StripQuotesEnabled(),
StripSignatures: cfg.Vector.Preprocess.StripSignaturesEnabled(),
StripQuotes: cfg.Vector.Preprocess.StripQuotesEnabled(),
StripSignatures: cfg.Vector.Preprocess.StripSignaturesEnabled(),
StripHTML: cfg.Vector.Preprocess.StripHTMLEnabled(),
StripBase64: cfg.Vector.Preprocess.StripBase64Enabled(),
StripURLTracking: cfg.Vector.Preprocess.StripURLTrackingEnabled(),
CollapseWhitespace: cfg.Vector.Preprocess.CollapseWhitespaceEnabled(),
},
MaxInputChars: cfg.Vector.Embeddings.MaxInputChars,
BatchSize: cfg.Vector.Embeddings.BatchSize,
Expand Down Expand Up @@ -170,12 +174,12 @@ func pickEmbedGeneration(ctx context.Context, backend vector.Backend, opts embed
if opts.Confirm != nil && !opts.Confirm() {
return 0, false, fmt.Errorf("aborted")
}
gen, err := backend.CreateGeneration(ctx, opts.Model, opts.Dimension)
gen, err := backend.CreateGeneration(ctx, opts.Model, opts.Dimension, opts.Fingerprint)
if err != nil {
return 0, false, fmt.Errorf("create generation: %w", err)
}
_, _ = fmt.Fprintf(opts.Stderr, "Building generation %d (%s:%d).\n",
gen, opts.Model, opts.Dimension)
_, _ = fmt.Fprintf(opts.Stderr, "Building generation %d (%s).\n",
gen, opts.Fingerprint)
return gen, true, nil
}

Expand Down
22 changes: 11 additions & 11 deletions cmd/msgvault/cmd/embed_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestPickEmbedGeneration_ResumesBuildingGeneration(t *testing.T) {

// Simulate an interrupted full rebuild: a building generation
// exists but no active generation.
gen, err := b.CreateGeneration(ctx, "fake", 4)
gen, err := b.CreateGeneration(ctx, "fake", 4, "")
if err != nil {
t.Fatalf("CreateGeneration: %v", err)
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestPickEmbedGeneration_NoGenerations_HintsFullRebuild(t *testing.T) {
func TestPickEmbedGeneration_ResumeFingerprintMismatch(t *testing.T) {
ctx := context.Background()
b := openTestBackend(t)
if _, err := b.CreateGeneration(ctx, "old-model", 4); err != nil {
if _, err := b.CreateGeneration(ctx, "old-model", 4, ""); err != nil {
t.Fatalf("CreateGeneration: %v", err)
}

Expand Down Expand Up @@ -173,14 +173,14 @@ func TestPickEmbedGeneration_PrefersBuildingOverActive_MatchingFingerprint(t *te
// Build state: an active generation exists, and a second building
// generation has been created for the SAME model+dim (the typical
// "I want to refresh my index" pattern).
activeGen, err := b.CreateGeneration(ctx, "fake", 4)
activeGen, err := b.CreateGeneration(ctx, "fake", 4, "")
if err != nil {
t.Fatalf("CreateGeneration (active): %v", err)
}
if err := b.ActivateGeneration(ctx, activeGen); err != nil {
t.Fatalf("ActivateGeneration: %v", err)
}
buildingGen, err := b.CreateGeneration(ctx, "fake", 4)
buildingGen, err := b.CreateGeneration(ctx, "fake", 4, "")
if err != nil {
t.Fatalf("CreateGeneration (building): %v", err)
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestPickEmbedGeneration_RejectsBuildingWithMismatchedFingerprint(t *testing

// State: building generation exists for an old model. No active
// generation, and config now points at a different model.
if _, err := b.CreateGeneration(ctx, "old-model", 4); err != nil {
if _, err := b.CreateGeneration(ctx, "old-model", 4, ""); err != nil {
t.Fatalf("CreateGeneration (building): %v", err)
}

Expand Down Expand Up @@ -249,14 +249,14 @@ func TestPickEmbedGeneration_StaleActivePlusMatchingBuilding(t *testing.T) {
ctx := context.Background()
b := openTestBackend(t)

staleActive, err := b.CreateGeneration(ctx, "old-model", 4)
staleActive, err := b.CreateGeneration(ctx, "old-model", 4, "")
if err != nil {
t.Fatalf("CreateGeneration (stale active): %v", err)
}
if err := b.ActivateGeneration(ctx, staleActive); err != nil {
t.Fatalf("ActivateGeneration: %v", err)
}
matchingBuilding, err := b.CreateGeneration(ctx, "new-model", 4)
matchingBuilding, err := b.CreateGeneration(ctx, "new-model", 4, "")
if err != nil {
t.Fatalf("CreateGeneration (matching building): %v", err)
}
Expand Down Expand Up @@ -292,14 +292,14 @@ func TestPickEmbedGeneration_ActivePlusMismatchedBuildingRejected(t *testing.T)
ctx := context.Background()
b := openTestBackend(t)

matchingActive, err := b.CreateGeneration(ctx, "fake", 4)
matchingActive, err := b.CreateGeneration(ctx, "fake", 4, "")
if err != nil {
t.Fatalf("CreateGeneration (active): %v", err)
}
if err := b.ActivateGeneration(ctx, matchingActive); err != nil {
t.Fatalf("ActivateGeneration: %v", err)
}
if _, err := b.CreateGeneration(ctx, "old-model", 4); err != nil {
if _, err := b.CreateGeneration(ctx, "old-model", 4, ""); err != nil {
t.Fatalf("CreateGeneration (stale building): %v", err)
}

Expand Down Expand Up @@ -351,7 +351,7 @@ func TestPickEmbedGeneration_ResumeReseedsUnseededBuilding(t *testing.T) {

// Step 1: create a building gen the normal way (which seeds + marks
// seeded_at).
gen, err := b.CreateGeneration(ctx, "fake", 4)
gen, err := b.CreateGeneration(ctx, "fake", 4, "")
if err != nil {
t.Fatalf("CreateGeneration: %v", err)
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestPickEmbedGeneration_ResumeRacesActivation(t *testing.T) {
// Create the building generation as if the operator had just run
// `msgvault build-embeddings --full-rebuild`. CreateGeneration seeds pending
// rows for id=1 via openTestBackend's seed message.
gen, err := b.CreateGeneration(ctx, "fake", 4)
gen, err := b.CreateGeneration(ctx, "fake", 4, "")
if err != nil {
t.Fatalf("CreateGeneration: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/msgvault/cmd/search_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func runHybridSearch(cmd *cobra.Command, queryStr, mode string, explain bool, sc
}
defer func() { _ = backend.Close() }()

active, err := vector.ResolveActive(ctx, backend, cfg.Vector.Embeddings)
active, err := vector.ResolveActiveForFingerprint(ctx, backend, cfg.Vector.GenerationFingerprint())
if err != nil {
return fmt.Errorf("resolve active generation: %w", err)
}
Expand All @@ -81,7 +81,7 @@ func runHybridSearch(cmd *cobra.Command, queryStr, mode string, explain bool, sc
})

eng := hybrid.NewEngine(backend, mainDB, embedClient, hybrid.Config{
ExpectedFingerprint: cfg.Vector.Embeddings.Fingerprint(),
ExpectedFingerprint: cfg.Vector.GenerationFingerprint(),
RRFK: cfg.Vector.Search.RRFK,
KPerSignal: cfg.Vector.Search.KPerSignal,
SubjectBoost: cfg.Vector.Search.SubjectBoost,
Expand Down
35 changes: 20 additions & 15 deletions cmd/msgvault/cmd/search_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,25 @@ func newVectorSearchTestEnv(t *testing.T, embedSrvURL string) (*store.Store, fun
if err != nil {
t.Fatalf("sqlitevec.Open: %v", err)
}
gid, err := b.CreateGeneration(ctx, "fake-model", 4)
// Build the vector config first so the generation can be seeded
// with the same fingerprint the search command will compute at run
// time. Otherwise ResolveActiveForFingerprint rejects the active
// generation as stale before the search ever reaches the index.
vecCfg := vector.Config{
Enabled: true,
Backend: "sqlite-vec",
DBPath: vecPath,
Embeddings: vector.EmbeddingsConfig{
Endpoint: embedSrvURL + "/v1",
Model: "fake-model",
Dimension: 4,
},
Search: vector.SearchConfig{
RRFK: 60,
KPerSignal: 10,
},
}
gid, err := b.CreateGeneration(ctx, "fake-model", 4, vecCfg.GenerationFingerprint())
if err != nil {
_ = b.Close()
t.Fatalf("CreateGeneration: %v", err)
Expand All @@ -73,20 +91,7 @@ func newVectorSearchTestEnv(t *testing.T, embedSrvURL string) (*store.Store, fun
cfg = &config.Config{
HomeDir: dir,
Data: config.DataConfig{DataDir: dir},
Vector: vector.Config{
Enabled: true,
Backend: "sqlite-vec",
DBPath: vecPath,
Embeddings: vector.EmbeddingsConfig{
Endpoint: embedSrvURL + "/v1",
Model: "fake-model",
Dimension: 4,
},
Search: vector.SearchConfig{
RRFK: 60,
KPerSignal: 10,
},
},
Vector: vecCfg,
}

restore := func() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/msgvault/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func runServe(cmd *cobra.Command, args []string) error {
Worker: vf.Worker,
Backend: vf.Backend,
VectorsDB: vf.VectorsDB,
Fingerprint: vf.Cfg.Embeddings.Fingerprint(),
Fingerprint: vf.Cfg.GenerationFingerprint(),
Log: logger,
}
schedule := cfg.Vector.Embed.Schedule.Cron
Expand Down
10 changes: 7 additions & 3 deletions cmd/msgvault/cmd/serve_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ func setupVectorFeatures(ctx context.Context, mainDB *sql.DB, mainPath string) (
MainDB: mainDB,
Client: client,
Preprocess: embed.PreprocessConfig{
StripQuotes: cfg.Vector.Preprocess.StripQuotesEnabled(),
StripSignatures: cfg.Vector.Preprocess.StripSignaturesEnabled(),
StripQuotes: cfg.Vector.Preprocess.StripQuotesEnabled(),
StripSignatures: cfg.Vector.Preprocess.StripSignaturesEnabled(),
StripHTML: cfg.Vector.Preprocess.StripHTMLEnabled(),
StripBase64: cfg.Vector.Preprocess.StripBase64Enabled(),
StripURLTracking: cfg.Vector.Preprocess.StripURLTrackingEnabled(),
CollapseWhitespace: cfg.Vector.Preprocess.CollapseWhitespaceEnabled(),
},
MaxInputChars: cfg.Vector.Embeddings.MaxInputChars,
BatchSize: cfg.Vector.Embeddings.BatchSize,
Expand All @@ -72,7 +76,7 @@ func setupVectorFeatures(ctx context.Context, mainDB *sql.DB, mainPath string) (
})

engine := hybrid.NewEngine(backend, mainDB, client, hybrid.Config{
ExpectedFingerprint: cfg.Vector.Embeddings.Fingerprint(),
ExpectedFingerprint: cfg.Vector.GenerationFingerprint(),
RRFK: cfg.Vector.Search.RRFK,
KPerSignal: cfg.Vector.Search.KPerSignal,
SubjectBoost: cfg.Vector.Search.SubjectBoost,
Expand Down
2 changes: 1 addition & 1 deletion internal/api/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@ type fakeVectorBackend struct {
searchErr error
}

func (f *fakeVectorBackend) CreateGeneration(_ context.Context, _ string, _ int) (vector.GenerationID, error) {
func (f *fakeVectorBackend) CreateGeneration(_ context.Context, _ string, _ int, _ string) (vector.GenerationID, error) {
return 0, errors.New("not implemented")
}
func (f *fakeVectorBackend) ActivateGeneration(_ context.Context, _ vector.GenerationID) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/mcp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ func (f *fakeBackend) ActiveGeneration(_ context.Context) (vector.Generation, er
func (f *fakeBackend) Search(_ context.Context, _ vector.GenerationID, _ []float32, _ int, _ vector.Filter) ([]vector.Hit, error) {
return f.searchHits, f.searchErr
}
func (f *fakeBackend) CreateGeneration(_ context.Context, _ string, _ int) (vector.GenerationID, error) {
func (f *fakeBackend) CreateGeneration(_ context.Context, _ string, _ int, _ string) (vector.GenerationID, error) {
return 0, errors.New("not implemented")
}
func (f *fakeBackend) ActivateGeneration(_ context.Context, _ vector.GenerationID) error {
Expand Down
25 changes: 20 additions & 5 deletions internal/scheduler/embed_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ type EmbedJob struct {
// case the daemon will not auto-activate building generations.
VectorsDB *sql.DB

// Fingerprint is the configured "<model>:<dim>" string. When set,
// a building generation whose fingerprint differs is left alone
// (CLI is the only entry point that can resolve a mismatch). When
// empty, the daemon falls back to "any building generation".
// Fingerprint is the configured generation fingerprint (typically
// vector.Config.GenerationFingerprint() — "model:dim:preprocess").
// When set, a building OR active generation whose fingerprint
// differs is left alone: the CLI is the only entry point that can
// resolve a mismatch (`build-embeddings --full-rebuild` or retire).
// When empty, the daemon falls back to "any building generation"
// for building gens and "the active generation as-is" for active —
// see pickTarget for why empty-fingerprint plus a present building
// is still refused.
Fingerprint string

// running guards against overlapping Run calls (cron fires while a
Expand Down Expand Up @@ -161,7 +166,12 @@ func (j *EmbedJob) Run(ctx context.Context) {
// 2. Mismatched building generation — log and bail. Resolution
// requires the CLI (`msgvault build-embeddings --full-rebuild` or retire),
// not the daemon.
// 3. Active generation — incremental top-up.
// 3. Active generation whose fingerprint matches config — incremental
// top-up. A mismatched active fingerprint is treated the same as a
// mismatched building: log and bail. Topping it up would let the
// daemon embed new messages under the current preprocessing policy
// into an index whose existing vectors used a different policy,
// silently mixing two embedding spaces in one generation.
//
// The bool is false when there's nothing to do or a lookup error
// occurred (already logged); the caller should return.
Expand Down Expand Up @@ -195,6 +205,11 @@ func (j *EmbedJob) pickTarget(ctx context.Context, log *slog.Logger) (vector.Gen
active, err := j.Backend.ActiveGeneration(ctx)
switch {
case err == nil:
if j.Fingerprint != "" && active.Fingerprint != j.Fingerprint {
log.Warn("embed: active generation fingerprint differs from config — leaving for CLI to resolve",
"active_fingerprint", active.Fingerprint, "config_fingerprint", j.Fingerprint)
return 0, false, false
}
return active.ID, false, true
case errors.Is(err, vector.ErrNoActiveGeneration):
return 0, false, false // nothing to do
Expand Down
51 changes: 50 additions & 1 deletion internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (f *fakeBackend) BuildingGeneration(ctx context.Context) (*vector.Generatio
return f.building, f.buildErr
}

func (f *fakeBackend) CreateGeneration(ctx context.Context, model string, dim int) (vector.GenerationID, error) {
func (f *fakeBackend) CreateGeneration(ctx context.Context, model string, dim int, fp string) (vector.GenerationID, error) {
panic("unexpected: CreateGeneration")
}
func (f *fakeBackend) ActivateGeneration(ctx context.Context, gen vector.GenerationID) error {
Expand Down Expand Up @@ -602,6 +602,55 @@ func TestEmbedJob_Run_ActiveGeneration(t *testing.T) {
}
}

func TestEmbedJob_Run_ActiveGenerationFingerprintMismatch(t *testing.T) {
// An active generation whose fingerprint differs from the configured
// one means the operator changed model, dimension, or preprocessing
// policy without running --full-rebuild. Topping it up would let the
// daemon embed new messages under the current policy into an index
// whose existing vectors used a different policy — silently mixing
// two embedding spaces in one generation. pickTarget must refuse,
// the same way it refuses a mismatched in-flight build.
backend := &fakeBackend{
active: vector.Generation{
ID: 7, State: vector.GenerationActive, Fingerprint: "old-model:768:p1-111111",
},
}
runner := &fakeRunner{}
job := &EmbedJob{Worker: runner, Backend: backend, Fingerprint: "new-model:768:p1-111111"}

job.Run(context.Background())

_, run, _ := runner.calls()
if run != 0 {
t.Errorf("RunOnce calls = %d, want 0 (refuse to top up mismatched active)", run)
}
if got := backend.activations(); len(got) != 0 {
t.Errorf("ActivateGeneration calls = %v, want none", got)
}
}

func TestEmbedJob_Run_ActiveGenerationFingerprintMatch(t *testing.T) {
// Counterpart of the mismatch test: when the active fingerprint
// matches config exactly, the daemon must continue to top it up.
backend := &fakeBackend{
active: vector.Generation{
ID: 9, State: vector.GenerationActive, Fingerprint: "m:768:p1-111111",
},
}
runner := &fakeRunner{}
job := &EmbedJob{Worker: runner, Backend: backend, Fingerprint: "m:768:p1-111111"}

job.Run(context.Background())

_, run, gen := runner.calls()
if run != 1 {
t.Errorf("RunOnce calls = %d, want 1 (matching active should top up)", run)
}
if gen != 9 {
t.Errorf("RunOnce gen = %d, want 9", gen)
}
}

func TestEmbedJob_Run_BuildingRefusedWithoutFingerprint(t *testing.T) {
// A daemon with no configured Fingerprint cannot tell whether a
// building generation matches the model it is supposed to be
Expand Down
20 changes: 15 additions & 5 deletions internal/vector/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ const (
// Generation describes an index generation — a complete corpus
// embedding under one model+dimension.
type Generation struct {
ID GenerationID
Model string
Dimension int
Fingerprint string // Model:Dimension
ID GenerationID
Model string
Dimension int
// Fingerprint is the opaque identifier supplied by the caller at
// CreateGeneration time (typically Config.GenerationFingerprint(),
// which folds the preprocessing policy into the model+dimension
// pair). Callers compare equality only — do not parse it.
Fingerprint string
State GenerationState
StartedAt time.Time
CompletedAt *time.Time
Expand Down Expand Up @@ -113,7 +117,13 @@ type Stats struct {

// Backend is the minimum contract a vector store must implement.
type Backend interface {
CreateGeneration(ctx context.Context, model string, dimension int) (GenerationID, error)
// CreateGeneration starts (or resumes) a building generation.
// fingerprint is stored verbatim on the row; pass
// Config.GenerationFingerprint() so a later config change (model,
// dimension, or any preprocess toggle) trips
// ResolveActiveForFingerprint and forces a --full-rebuild instead
// of silently mixing inconsistently-prepared vectors.
CreateGeneration(ctx context.Context, model string, dimension int, fingerprint string) (GenerationID, error)
ActivateGeneration(ctx context.Context, gen GenerationID) error
RetireGeneration(ctx context.Context, gen GenerationID) error

Expand Down
Loading
Loading