Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 109 additions & 153 deletions src/powermem/core/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def add(

def _simple_add(
self,
messages,
messages: List[Union[str, Dict[str, Any]]],
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
Expand All @@ -538,107 +538,122 @@ def _simple_add(
memory_type: Optional[str] = None,
prompt: Optional[str] = None,
) -> Dict[str, Any]:
"""Simple add mode: direct storage without intelligence."""
# Parse messages into content
if isinstance(messages, str):
content = messages
elif isinstance(messages, dict):
content = messages.get("content", "")
elif isinstance(messages, list):
content = "\n".join([msg.get("content", "") for msg in messages if isinstance(msg, dict) and msg.get("content")])
else:
raise ValueError("messages must be str, dict, or list[dict]")

# Validate content is not empty
if not content or not content.strip():
logger.error(f"Cannot store empty content. Messages: {messages}")
raise ValueError(f"Cannot create memory with empty content. Original messages: {messages}")

# Select embedding service based on metadata (for sub-store routing)
embedding_service = self._get_embedding_service(metadata)
"""
Add messages to memory by processing each message,
generating embedding, hashing, metadata, logging, telemetry, and graph storage.
"""

# Generate embedding
embedding = embedding_service.embed(content, memory_action="add")

# Disabled LLM-based importance evaluation to save tokens
# Process with intelligence manager
# enhanced_metadata = self.intelligence.process_metadata(content, metadata)
enhanced_metadata = metadata # Use original metadata without LLM evaluation

# Intelligent plugin annotations
extra_fields = {}
if self._intelligence_plugin and self._intelligence_plugin.enabled:
extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata)

created_results = []

# Generate content hash for deduplication
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
for message in messages:
# Normalize message content and metadata
if isinstance(message, str):
message_content = message
message_metadata = {}
elif isinstance(message, dict):
message_content = message.get("content") or message.get("text") or ""
message_metadata = message.get("metadata", {})
else:
raise ValueError("Each message must be a string or dict")

# Extract category from enhanced metadata if present
category = ""
if enhanced_metadata and isinstance(enhanced_metadata, dict):
category = enhanced_metadata.get("category", "")
# Remove category from metadata to avoid duplication
enhanced_metadata = {k: v for k, v in enhanced_metadata.items() if k != "category"}
# Combine global and message-level metadata and filters
combined_metadata = dict(metadata or {})
combined_metadata.update(message_metadata)
if filters:
combined_metadata.update(filters)

# Final validation before storage
if not content or not content.strip():
raise ValueError(f"Refusing to store empty content. Original messages: {messages}")

# Use self.agent_id as fallback if agent_id is not provided
agent_id = agent_id or self.agent_id

# Store in database
memory_data = {
"content": content,
"embedding": embedding,
"user_id": user_id,
"agent_id": agent_id,
"run_id": run_id,
"hash": content_hash,
"category": category,
"metadata": enhanced_metadata or {},
"filters": filters or {},
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}
_user_id = user_id or message.get("user_id")
_agent_id = agent_id or message.get("agent_id") or self.agent_id

if extra_fields:
memory_data.update(extra_fields)

memory_id = self.storage.add_memory(memory_data)

# Log audit event
self.audit.log_event("memory.add", {
"memory_id": memory_id,
"user_id": user_id,
"agent_id": agent_id,
"content_length": len(content)
}, user_id=user_id, agent_id=agent_id)

# Capture telemetry
self.telemetry.capture_event("memory.add", {
"memory_id": memory_id,
"user_id": user_id,
"agent_id": agent_id
})

graph_result = self._add_to_graph(messages, filters, user_id, agent_id, run_id)

result: Dict[str, Any] = {
"results": [{
"id": memory_id,
"memory": content,
"event": "ADD",
"user_id": user_id,
"agent_id": agent_id,
content = message_content.strip()
if not content:
logger.error(f"Cannot store empty content. Message: {message}")
raise ValueError("Cannot create memory for empty content")

# Select embedding service and generate embedding
embedding_service = self._get_embedding_service(combined_metadata)
embedding = embedding_service.embed(content, memory_action="add")

# Metadata enhancement (LLM-based intelligence plugin)
enhanced_metadata = combined_metadata or {}

extra_fields = {}
if self._intelligence_plugin and self._intelligence_plugin.enabled:
extra_fields = self._intelligence_plugin.on_add(content=content, metadata=enhanced_metadata)

# Content hash for deduplication
content_hash = hashlib.md5(content.encode("utf-8")).hexdigest()

# Extract category from metadata if present
category = ""
if enhanced_metadata and isinstance(enhanced_metadata, dict):
category = enhanced_metadata.get("category", "")
# Remove category from enhanced metadata to avoid duplication
enhanced_metadata = {k: v for k, v in enhanced_metadata.items() if k != "category"}

# Compose memory data for storage
memory_data = {
"content": content,
"embedding": embedding,
"user_id": _user_id,
"agent_id": _agent_id,
"run_id": run_id,
"metadata": metadata,
"created_at": memory_data["created_at"].isoformat() if isinstance(memory_data["created_at"], datetime) else memory_data["created_at"],
}]
}
"hash": content_hash,
"category": category,
"metadata": enhanced_metadata,
"scope": scope,
"memory_type": memory_type,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}
if extra_fields:
memory_data.update(extra_fields)

# Store memory and get ID
memory_id = self.storage.add_memory(memory_data)

# Audit logging
self.audit.log_event(
"memory.add",
{
"memory_id": memory_id,
"user_id": _user_id,
"agent_id": _agent_id,
"content_length": len(content),
},
user_id=_user_id,
agent_id=_agent_id,
)

# Telemetry capture
self.telemetry.capture_event(
"memory.add",
{
"memory_id": memory_id,
"user_id": _user_id,
"agent_id": _agent_id,
},
)

# Append creation result
created_results.append(
{
"id": memory_id,
"memory": content,
"user_id": _user_id,
"agent_id": _agent_id,
}
)

# Optional graph storage addition and relations
graph_result = None
if hasattr(self, "_add_to_graph"):
graph_result = self._add_to_graph(messages, filters, user_id, agent_id, run_id)

result = {"results": created_results}
if graph_result:
result["relations"] = graph_result

return result

def _intelligent_add(
Expand Down Expand Up @@ -888,65 +903,6 @@ def _add_to_graph(

return self.graph_store.add(data, graph_filters)

def _create_memory(
self,
content: str,
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
filters: Optional[Dict[str, Any]] = None,
existing_embeddings: Optional[Dict[str, Any]] = None,
) -> int:
"""Create a memory with optional embeddings."""
# Validate content is not empty
if not content or not content.strip():
raise ValueError(f"Cannot create memory with empty content: '{content}'")

# Select embedding service based on metadata (for sub-store routing)
embedding_service = self._get_embedding_service(metadata)

# Generate or use existing embedding
if existing_embeddings and content in existing_embeddings:
embedding = existing_embeddings[content]
else:
embedding = embedding_service.embed(content, memory_action="add")

# Disabled LLM-based importance evaluation to save tokens
# Process metadata
# enhanced_metadata = self.intelligence.process_metadata(content, metadata)
enhanced_metadata = metadata # Use original metadata without LLM evaluation

# Generate content hash
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()

# Extract category
category = ""
if enhanced_metadata and isinstance(enhanced_metadata, dict):
category = enhanced_metadata.get("category", "")
enhanced_metadata = {k: v for k, v in enhanced_metadata.items() if k != "category"}

# Use self.agent_id as fallback if agent_id is not provided
agent_id = agent_id or self.agent_id

# Create memory data
memory_data = {
"content": content,
"embedding": embedding,
"user_id": user_id,
"agent_id": agent_id,
"run_id": run_id,
"hash": content_hash,
"category": category,
"metadata": enhanced_metadata or {},
"filters": filters or {},
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
}

memory_id = self.storage.add_memory(memory_data)

return memory_id

def _update_memory(
self,
Expand Down