Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ make lint # Run linter
./msgvault stats # Show archive stats

# Apple Mail import
./msgvault import-messenger --me you@facebook.messenger ~/facebook-dyi # Import Facebook Messenger DYI
./msgvault import-emlx # Auto-discover accounts
./msgvault import-emlx ~/Library/Mail # Explicit mail directory
./msgvault import-emlx --account me@gmail.com # Specific account(s)
Expand Down Expand Up @@ -196,6 +197,10 @@ The TUI automatically builds/updates the Parquet cache on launch when new messag

Sync is **read-only** - no modifications to Gmail.

## Test Data

Never use real people's names, email addresses, or identifiers in test fixtures. Use obviously synthetic names: `alice`, `bob`, `Test User`, `user@example.com`. Before committing test data, verify no real PII is present.

## Go Development

After making any Go code changes, always run `go fmt ./...` and `go vet ./...` before committing. Stage ALL resulting changes, including formatting-only files.
Expand Down
84 changes: 84 additions & 0 deletions cmd/msgvault/cmd/build_cache_messenger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cmd

import (
"context"
"database/sql"
"path/filepath"
"testing"

_ "github.com/marcboeker/go-duckdb"
"github.com/wesm/msgvault/internal/fbmessenger"
"github.com/wesm/msgvault/internal/store"
)

// TestBuildCache_AfterMessengerImport verifies invariant #3 from the plan:
// after importing Messenger JSON and running buildCache, the resulting
// Parquet partition files exist and contain the expected row count.
func TestBuildCache_AfterMessengerImport(t *testing.T) {
tmp := t.TempDir()
dbPath := filepath.Join(tmp, "msgvault.db")
analyticsDir := filepath.Join(tmp, "analytics")

st, err := store.Open(dbPath)
if err != nil {
t.Fatalf("open store: %v", err)
}
if err := st.InitSchema(); err != nil {
t.Fatalf("init schema: %v", err)
}

fixture, err := filepath.Abs("../../../internal/fbmessenger/testdata/json_simple")
if err != nil {
t.Fatal(err)
}
summary, err := fbmessenger.ImportDYI(context.Background(), st, fbmessenger.ImportOptions{
Me: "wes@facebook.messenger",
RootDir: fixture,
Format: "auto",
AttachmentsDir: t.TempDir(),
})
if err != nil {
t.Fatalf("ImportDYI: %v", err)
}
if err := st.Close(); err != nil {
t.Fatal(err)
}
if summary.MessagesAdded != 4 {
t.Fatalf("imported %d want 4", summary.MessagesAdded)
}

result, err := buildCache(dbPath, analyticsDir, false)
if err != nil {
t.Fatalf("buildCache: %v", err)
}
if result.Skipped {
t.Fatal("buildCache unexpectedly skipped")
}

duckdb, err := sql.Open("duckdb", "")
if err != nil {
t.Fatalf("open duckdb: %v", err)
}
defer func() { _ = duckdb.Close() }()

var n int
pattern := filepath.Join(analyticsDir, "messages", "**", "*.parquet")
if err := duckdb.QueryRow(
`SELECT COUNT(*) FROM read_parquet(?, hive_partitioning=true)`, pattern,
).Scan(&n); err != nil {
t.Fatalf("duckdb scan: %v", err)
}
if n != 4 {
t.Errorf("parquet messages=%d want 4", n)
}

var mtype string
if err := duckdb.QueryRow(
`SELECT DISTINCT message_type FROM read_parquet(?, hive_partitioning=true)`, pattern,
).Scan(&mtype); err != nil {
t.Fatalf("duckdb message_type: %v", err)
}
if mtype != "fbmessenger" {
t.Errorf("message_type=%q want fbmessenger", mtype)
}
}
156 changes: 156 additions & 0 deletions cmd/msgvault/cmd/import_messenger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/wesm/msgvault/internal/fbmessenger"
"github.com/wesm/msgvault/internal/store"
)

var (
importMessengerMe string
importMessengerFormat string
importMessengerLimit int
importMessengerNoResume bool
importMessengerCheckpointEvery int
)

var importMessengerCmd = &cobra.Command{
Use: "import-messenger <dyi-export-dir>",
Short: "Import Facebook Messenger from a Download Your Information export",
Long: `Import Facebook Messenger conversations from a DYI export (JSON or HTML).

Both JSON and HTML DYI formats are supported. When a thread contains both, the
JSON form wins because it preserves timestamps at millisecond precision and
reactions with relational fidelity. Use --format both to import both copies
into a single conversation with disambiguated source_message_id values.

Participants are synthesized as <slug>@facebook.messenger addresses. Two
participants whose display names produce the same slug are merged with a
warning — DYI exports do not expose stable user IDs, so this is the best we
can do without false-splitting one person into two phantom participants.

Your own identifier is required via --me and must itself be a
<slug>@facebook.messenger address; this value becomes the source identifier
and drives is_from_me on outbound messages.

HTML exports do not expose timezone information; timestamps are stored as
UTC. JSON exports have millisecond-precision timestamps that are preserved
verbatim.

Examples:
msgvault import-messenger --me test.user@facebook.messenger ~/downloads/facebook-export
msgvault import-messenger --me test.user@facebook.messenger --format both ./dyi
msgvault import-messenger --me test.user@facebook.messenger --limit 100 ./dyi
`,
Args: cobra.ExactArgs(1),
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if err := MustBeLocal("import-messenger"); err != nil {
return err
}
return runImportMessenger(cmd, args[0])
},
}

func runImportMessenger(cmd *cobra.Command, rootDir string) error {
if info, err := os.Stat(rootDir); err != nil {
return fmt.Errorf("source directory not found: %w", err)
} else if !info.IsDir() {
return fmt.Errorf("source path is not a directory: %s", rootDir)
}

dbPath := cfg.DatabaseDSN()
s, err := store.Open(dbPath)
if err != nil {
return fmt.Errorf("open database: %w", err)
}
defer func() { _ = s.Close() }()

if err := s.InitSchema(); err != nil {
return fmt.Errorf("init schema: %w", err)
}

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigChan)
go func() {
select {
case <-sigChan:
_, _ = fmt.Fprintln(cmd.ErrOrStderr(), "\nInterrupted. Saving checkpoint...")
cancel()
case <-ctx.Done():
}
}()

opts := fbmessenger.ImportOptions{
Me: importMessengerMe,
RootDir: rootDir,
Format: importMessengerFormat,
AttachmentsDir: cfg.AttachmentsDir(),
Limit: importMessengerLimit,
NoResume: importMessengerNoResume,
CheckpointEvery: importMessengerCheckpointEvery,
Logger: logger,
}

_, _ = fmt.Fprintf(cmd.OutOrStdout(), "Importing Facebook Messenger DYI from %s\n", rootDir)
_, _ = fmt.Fprintf(cmd.OutOrStdout(), "Me: %s\n", importMessengerMe)
_, _ = fmt.Fprintln(cmd.OutOrStdout())

summary, err := fbmessenger.ImportDYI(ctx, s, opts)
if err != nil {
if ctx.Err() != nil {
_, _ = fmt.Fprintln(cmd.OutOrStdout(), "\nImport interrupted. Re-run to continue.")
rebuildCacheAfterWrite(dbPath)
return nil
}
return fmt.Errorf("import failed: %w", err)
}

_, _ = fmt.Fprintln(cmd.OutOrStdout())
if summary.WasResumed {
_, _ = fmt.Fprintln(cmd.OutOrStdout(), "Resumed from checkpoint.")
}
_, _ = fmt.Fprintln(cmd.OutOrStdout(), "Import complete!")
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Duration: %s\n", summary.Duration.Round(time.Millisecond))
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Threads: %d processed, %d skipped\n",
summary.ThreadsProcessed, summary.ThreadsSkipped)
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Files skipped: %d (unrecognized siblings)\n", summary.FilesSkipped)
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Messages: %d processed, %d added, %d skipped\n",
summary.MessagesProcessed, summary.MessagesAdded, summary.MessagesSkipped)
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Participants: %d\n", summary.ParticipantsResolved)
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Attachments: %d found, %d stored\n", summary.AttachmentsFound, summary.AttachmentsStored)
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Reactions: %d\n", summary.ReactionsAdded)
if summary.Errors > 0 {
_, _ = fmt.Fprintf(cmd.OutOrStdout(), " Errors: %d\n", summary.Errors)
}
if summary.MessagesAdded > 0 && summary.FromMeCount == 0 {
_, _ = fmt.Fprintf(cmd.OutOrStdout(),
"\n Warning: no messages matched --me %q (slug: %q).\n"+
" The --me value must match the slug of your display name in the export.\n",
importMessengerMe, fbmessenger.Slug(fbmessenger.StripDomain(importMessengerMe)))
}

rebuildCacheAfterWrite(dbPath)
return nil
}

func init() {
importMessengerCmd.Flags().StringVar(&importMessengerMe, "me", "", "your <slug>@facebook.messenger identifier (required)")
importMessengerCmd.Flags().StringVar(&importMessengerFormat, "format", "auto", "format to import: auto|json|html|both")
importMessengerCmd.Flags().IntVar(&importMessengerLimit, "limit", 0, "limit number of messages (for testing)")
importMessengerCmd.Flags().BoolVar(&importMessengerNoResume, "no-resume", false, "ignore any existing checkpoint and start fresh")
importMessengerCmd.Flags().IntVar(&importMessengerCheckpointEvery, "checkpoint-interval", 200, "checkpoint every N messages")
_ = importMessengerCmd.MarkFlagRequired("me")
rootCmd.AddCommand(importMessengerCmd)
}
Loading
Loading