diff --git a/crates/pulsing-actor/src/cluster/gossip.rs b/crates/pulsing-actor/src/cluster/gossip.rs index b2b259d5e..7b7bd28f6 100644 --- a/crates/pulsing-actor/src/cluster/gossip.rs +++ b/crates/pulsing-actor/src/cluster/gossip.rs @@ -7,7 +7,8 @@ //! - PFail/Fail failure detection use super::member::{ - ActorLocation, ClusterNode, FailureInfo, MemberInfo, MemberStatus, NamedActorInfo, NodeStatus, + ActorLocation, ClusterNode, FailureInfo, MemberInfo, MemberStatus, NamedActorInfo, + NamedActorInstance, NodeStatus, }; use super::swim::SwimConfig; use crate::actor::{ActorId, ActorPath, NodeId, StopReason}; @@ -127,6 +128,12 @@ pub enum GossipMessage { NamedActorRegistered { path: ActorPath, node_id: NodeId, + /// Actor ID (optional for backward compatibility) + #[serde(default)] + actor_id: Option, + /// Metadata (e.g., Python class, module, file path) + #[serde(default)] + metadata: std::collections::HashMap, }, NamedActorUnregistered { path: ActorPath, @@ -552,13 +559,28 @@ impl GossipCluster { self.state.actors.write().await.remove(&actor_id); Ok(None) } - GossipMessage::NamedActorRegistered { path, node_id } => { + GossipMessage::NamedActorRegistered { + path, + node_id, + actor_id, + metadata, + } => { let mut named = self.state.named_actors.write().await; let key = path.as_str(); - named - .entry(key.clone()) - .and_modify(|info| info.add_instance(node_id)) - .or_insert_with(|| NamedActorInfo::with_instance(path, node_id)); + if let Some(aid) = actor_id { + // Full registration with actor_id and metadata + let instance = NamedActorInstance::with_metadata(node_id, aid, metadata); + named + .entry(key.clone()) + .and_modify(|info| info.add_full_instance(instance.clone())) + .or_insert_with(|| NamedActorInfo::with_full_instance(path, instance)); + } else { + // Legacy registration (no actor_id) + named + .entry(key.clone()) + .and_modify(|info| info.add_instance(node_id)) + .or_insert_with(|| NamedActorInfo::with_instance(path, node_id)); + } Ok(None) } GossipMessage::NamedActorUnregistered { path, node_id } => { @@ -624,6 +646,33 @@ impl GossipCluster { // Named Actor Registry // ======================================================================== + /// Register a named actor with full details (actor_id and metadata) + pub async fn register_named_actor_full( + &self, + path: ActorPath, + actor_id: ActorId, + metadata: std::collections::HashMap, + ) { + let key = path.as_str(); + let instance = + NamedActorInstance::with_metadata(self.state.local_node, actor_id, metadata.clone()); + { + let mut named = self.state.named_actors.write().await; + named + .entry(key.clone()) + .and_modify(|info| info.add_full_instance(instance.clone())) + .or_insert_with(|| NamedActorInfo::with_full_instance(path.clone(), instance)); + } + let msg = GossipMessage::NamedActorRegistered { + path, + node_id: self.state.local_node, + actor_id: Some(actor_id), + metadata, + }; + let _ = self.broadcast(&msg).await; + } + + /// Register a named actor (legacy, without actor_id) pub async fn register_named_actor(&self, path: ActorPath) { let key = path.as_str(); { @@ -638,6 +687,8 @@ impl GossipCluster { let msg = GossipMessage::NamedActorRegistered { path, node_id: self.state.local_node, + actor_id: None, + metadata: std::collections::HashMap::new(), }; let _ = self.broadcast(&msg).await; } @@ -687,21 +738,47 @@ impl GossipCluster { } pub async fn get_named_actor_instances(&self, path: &ActorPath) -> Vec { - let instances = { + let node_ids = { let named = self.state.named_actors.read().await; match named.get(&path.as_str()) { - Some(info) => info.instances.clone(), + Some(info) => info.instance_nodes.clone(), None => return Vec::new(), } }; let nodes = self.state.cluster_nodes.read().await; - instances + node_ids .iter() .filter_map(|id| nodes.get(id).map(ClusterState::node_to_member)) .collect() } + /// Get detailed instance information for a named actor + pub async fn get_named_actor_instances_detailed( + &self, + path: &ActorPath, + ) -> Vec<(MemberInfo, Option)> { + let (node_ids, instances_map) = { + let named = self.state.named_actors.read().await; + match named.get(&path.as_str()) { + Some(info) => (info.instance_nodes.clone(), info.instances.clone()), + None => return Vec::new(), + } + }; + + let nodes = self.state.cluster_nodes.read().await; + node_ids + .iter() + .filter_map(|id| { + nodes.get(id).map(|node| { + let member = ClusterState::node_to_member(node); + let instance = instances_map.get(id).cloned(); + (member, instance) + }) + }) + .collect() + } + pub async fn all_named_actors(&self) -> Vec { self.state .named_actors diff --git a/crates/pulsing-actor/src/cluster/member.rs b/crates/pulsing-actor/src/cluster/member.rs index 46f515b8c..cb41a39e2 100644 --- a/crates/pulsing-actor/src/cluster/member.rs +++ b/crates/pulsing-actor/src/cluster/member.rs @@ -2,7 +2,7 @@ use crate::actor::{ActorId, ActorPath, NodeId}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::time::Instant; @@ -223,6 +223,42 @@ impl ActorLocation { } } +/// Instance details for a named actor on a specific node +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NamedActorInstance { + /// The node ID where this instance is running + pub node_id: NodeId, + /// The actor ID of this instance + pub actor_id: ActorId, + /// Metadata (e.g., Python class, module, file path) + #[serde(default)] + pub metadata: HashMap, +} + +impl NamedActorInstance { + /// Create a new instance with just node_id and actor_id + pub fn new(node_id: NodeId, actor_id: ActorId) -> Self { + Self { + node_id, + actor_id, + metadata: HashMap::new(), + } + } + + /// Create with metadata + pub fn with_metadata( + node_id: NodeId, + actor_id: ActorId, + metadata: HashMap, + ) -> Self { + Self { + node_id, + actor_id, + metadata, + } + } +} + /// Named actor registration info - supports multiple instances /// /// A named actor can have multiple instances across different nodes. @@ -232,8 +268,12 @@ pub struct NamedActorInfo { /// Actor path (namespace/path/name) pub path: ActorPath, - /// All instances (node IDs where this actor is deployed) - pub instances: HashSet, + /// All instances mapped by node_id + pub instances: HashMap, + + /// Legacy: just node IDs for backward compatibility + #[serde(default)] + pub instance_nodes: HashSet, /// Version number for conflict resolution (CRDT-like merge) pub version: u64, @@ -244,32 +284,59 @@ impl NamedActorInfo { pub fn new(path: ActorPath) -> Self { Self { path, - instances: HashSet::new(), + instances: HashMap::new(), + instance_nodes: HashSet::new(), version: 0, } } - /// Create with a single instance + /// Create with a single instance (legacy, no actor_id) pub fn with_instance(path: ActorPath, node_id: NodeId) -> Self { - let mut instances = HashSet::new(); - instances.insert(node_id); + let mut instance_nodes = HashSet::new(); + instance_nodes.insert(node_id); + Self { + path, + instances: HashMap::new(), + instance_nodes, + version: 1, + } + } + + /// Create with full instance details + pub fn with_full_instance(path: ActorPath, instance: NamedActorInstance) -> Self { + let mut instances = HashMap::new(); + let mut instance_nodes = HashSet::new(); + let node_id = instance.node_id; + instance_nodes.insert(node_id); + instances.insert(node_id, instance); Self { path, instances, + instance_nodes, version: 1, } } - /// Add an instance + /// Add an instance (legacy, no actor_id) pub fn add_instance(&mut self, node_id: NodeId) { - if self.instances.insert(node_id) { + if self.instance_nodes.insert(node_id) { self.version += 1; } } + /// Add a full instance with details + pub fn add_full_instance(&mut self, instance: NamedActorInstance) { + let node_id = instance.node_id; + self.instance_nodes.insert(node_id); + self.instances.insert(node_id, instance); + self.version += 1; + } + /// Remove an instance pub fn remove_instance(&mut self, node_id: &NodeId) -> bool { - if self.instances.remove(node_id) { + let removed_node = self.instance_nodes.remove(node_id); + let removed_instance = self.instances.remove(node_id).is_some(); + if removed_node || removed_instance { self.version += 1; true } else { @@ -279,18 +346,32 @@ impl NamedActorInfo { /// Check if the actor has any instances pub fn is_empty(&self) -> bool { - self.instances.is_empty() + self.instance_nodes.is_empty() && self.instances.is_empty() } /// Get the number of instances pub fn instance_count(&self) -> usize { - self.instances.len() + // Use instance_nodes for count (backward compatible) + self.instance_nodes.len() + } + + /// Get all node IDs where this actor has instances + pub fn node_ids(&self) -> impl Iterator { + self.instance_nodes.iter() + } + + /// Get instance details for a node + pub fn get_instance(&self, node_id: &NodeId) -> Option<&NamedActorInstance> { + self.instances.get(node_id) } /// Merge with another NamedActorInfo (union of instances) pub fn merge(&mut self, other: &NamedActorInfo) { - for node_id in &other.instances { - self.instances.insert(*node_id); + for node_id in &other.instance_nodes { + self.instance_nodes.insert(*node_id); + } + for (node_id, instance) in &other.instances { + self.instances.insert(*node_id, instance.clone()); } self.version = self.version.max(other.version) + 1; } @@ -299,7 +380,7 @@ impl NamedActorInfo { pub fn select_instance(&self) -> Option { use rand::prelude::IteratorRandom; let mut rng = rand::rng(); - self.instances.iter().choose(&mut rng).cloned() + self.instance_nodes.iter().choose(&mut rng).cloned() } } @@ -508,7 +589,7 @@ mod tests { assert_eq!(info.instance_count(), 1); assert!(!info.is_empty()); assert_eq!(info.version, 1); - assert!(info.instances.contains(&node_id)); + assert!(info.instance_nodes.contains(&node_id)); } #[test] @@ -570,9 +651,9 @@ mod tests { info1.merge(&info2); assert_eq!(info1.instance_count(), 3); - assert!(info1.instances.contains(&node1)); - assert!(info1.instances.contains(&node2)); - assert!(info1.instances.contains(&node3)); + assert!(info1.instance_nodes.contains(&node1)); + assert!(info1.instance_nodes.contains(&node2)); + assert!(info1.instance_nodes.contains(&node3)); } #[test] @@ -626,4 +707,143 @@ mod tests { assert!(dead.supersedes(&alive)); assert!(!alive.supersedes(&dead)); } + + // ======================================================================== + // NamedActorInstance Tests + // ======================================================================== + + #[test] + fn test_named_actor_instance_new() { + let node_id = NodeId::generate(); + let actor_id = ActorId::local(42); + + let instance = NamedActorInstance::new(node_id, actor_id); + + assert_eq!(instance.node_id, node_id); + assert_eq!(instance.actor_id, actor_id); + assert!(instance.metadata.is_empty()); + } + + #[test] + fn test_named_actor_instance_with_metadata() { + let node_id = NodeId::generate(); + let actor_id = ActorId::local(42); + let mut metadata = HashMap::new(); + metadata.insert("class".to_string(), "Counter".to_string()); + metadata.insert("module".to_string(), "__main__".to_string()); + metadata.insert("file".to_string(), "/app/main.py".to_string()); + + let instance = NamedActorInstance::with_metadata(node_id, actor_id, metadata.clone()); + + assert_eq!(instance.node_id, node_id); + assert_eq!(instance.actor_id, actor_id); + assert_eq!(instance.metadata.get("class"), Some(&"Counter".to_string())); + assert_eq!( + instance.metadata.get("module"), + Some(&"__main__".to_string()) + ); + assert_eq!( + instance.metadata.get("file"), + Some(&"/app/main.py".to_string()) + ); + } + + #[test] + fn test_named_actor_info_with_full_instance() { + let path = ActorPath::new("actors/counter").unwrap(); + let node_id = NodeId::generate(); + let actor_id = ActorId::local(42); + let mut metadata = HashMap::new(); + metadata.insert("class".to_string(), "Counter".to_string()); + + let instance = NamedActorInstance::with_metadata(node_id, actor_id, metadata); + let info = NamedActorInfo::with_full_instance(path.clone(), instance); + + assert_eq!(info.path, path); + assert_eq!(info.instance_count(), 1); + assert!(info.instance_nodes.contains(&node_id)); + assert!(info.instances.contains_key(&node_id)); + + let retrieved = info.get_instance(&node_id).unwrap(); + assert_eq!(retrieved.actor_id, actor_id); + assert_eq!( + retrieved.metadata.get("class"), + Some(&"Counter".to_string()) + ); + } + + #[test] + fn test_named_actor_info_add_full_instance() { + let path = ActorPath::new("actors/counter").unwrap(); + let node1 = NodeId::generate(); + let node2 = NodeId::generate(); + let actor_id1 = ActorId::local(1); + let actor_id2 = ActorId::local(2); + + let mut info = NamedActorInfo::new(path); + + let instance1 = NamedActorInstance::new(node1, actor_id1); + info.add_full_instance(instance1); + assert_eq!(info.instance_count(), 1); + + let instance2 = NamedActorInstance::new(node2, actor_id2); + info.add_full_instance(instance2); + assert_eq!(info.instance_count(), 2); + + assert!(info.get_instance(&node1).is_some()); + assert!(info.get_instance(&node2).is_some()); + assert_eq!(info.get_instance(&node1).unwrap().actor_id, actor_id1); + assert_eq!(info.get_instance(&node2).unwrap().actor_id, actor_id2); + } + + #[test] + fn test_named_actor_info_get_instance_not_found() { + let path = ActorPath::new("actors/counter").unwrap(); + let node_id = NodeId::generate(); + + let info = NamedActorInfo::new(path); + + assert!(info.get_instance(&node_id).is_none()); + } + + #[test] + fn test_named_actor_info_merge_with_full_instances() { + let path = ActorPath::new("actors/counter").unwrap(); + let node1 = NodeId::generate(); + let node2 = NodeId::generate(); + let actor_id1 = ActorId::local(1); + let actor_id2 = ActorId::local(2); + + let mut metadata1 = HashMap::new(); + metadata1.insert("class".to_string(), "Counter".to_string()); + let instance1 = NamedActorInstance::with_metadata(node1, actor_id1, metadata1); + let mut info1 = NamedActorInfo::with_full_instance(path.clone(), instance1); + + let mut metadata2 = HashMap::new(); + metadata2.insert("class".to_string(), "Counter".to_string()); + let instance2 = NamedActorInstance::with_metadata(node2, actor_id2, metadata2); + let info2 = NamedActorInfo::with_full_instance(path.clone(), instance2); + + info1.merge(&info2); + + assert_eq!(info1.instance_count(), 2); + assert!(info1.get_instance(&node1).is_some()); + assert!(info1.get_instance(&node2).is_some()); + } + + #[test] + fn test_named_actor_info_node_ids_iterator() { + let path = ActorPath::new("actors/counter").unwrap(); + let node1 = NodeId::generate(); + let node2 = NodeId::generate(); + + let mut info = NamedActorInfo::new(path); + info.add_instance(node1); + info.add_instance(node2); + + let node_ids: Vec<_> = info.node_ids().collect(); + assert_eq!(node_ids.len(), 2); + assert!(node_ids.contains(&&node1)); + assert!(node_ids.contains(&&node2)); + } } diff --git a/crates/pulsing-actor/src/cluster/mod.rs b/crates/pulsing-actor/src/cluster/mod.rs index c3bbad682..df84b0810 100644 --- a/crates/pulsing-actor/src/cluster/mod.rs +++ b/crates/pulsing-actor/src/cluster/mod.rs @@ -10,5 +10,5 @@ mod member; pub mod swim; pub use gossip::{GossipCluster, GossipConfig, GossipMessage}; -pub use member::{ActorLocation, MemberInfo, MemberStatus, NamedActorInfo}; +pub use member::{ActorLocation, MemberInfo, MemberStatus, NamedActorInfo, NamedActorInstance}; pub use swim::{SwimConfig, SwimDetector, SwimMessage}; diff --git a/crates/pulsing-actor/src/system.rs b/crates/pulsing-actor/src/system.rs index 80cb82e96..a65c687e0 100644 --- a/crates/pulsing-actor/src/system.rs +++ b/crates/pulsing-actor/src/system.rs @@ -6,6 +6,7 @@ use crate::actor::{ }; use crate::cluster::{ GossipCluster, GossipConfig, GossipMessage, MemberInfo, MemberStatus, NamedActorInfo, + NamedActorInstance, }; use crate::metrics::{metrics, SystemMetrics as PrometheusMetrics}; use crate::supervision::SupervisionSpec; @@ -135,6 +136,8 @@ pub struct SpawnOptions { pub public: bool, /// Supervision specification (restart policy) pub supervision: SupervisionSpec, + /// Actor metadata (e.g., Python class, module, file path) + pub metadata: HashMap, } impl SpawnOptions { @@ -160,6 +163,12 @@ impl SpawnOptions { self.supervision = supervision; self } + + /// Set actor metadata + pub fn metadata(mut self, metadata: HashMap) -> Self { + self.metadata = metadata; + self + } } /// Load balance strategy for resolving named actors with multiple instances @@ -610,7 +619,7 @@ impl ActorSystem { let (sender, receiver) = mailbox.split(); let stats = Arc::new(ActorStats::default()); - let metadata = HashMap::new(); + let metadata = options.metadata.clone(); // Create context let ctx = ActorContext::new(actor_id); @@ -633,7 +642,7 @@ impl ActorSystem { sender: sender.clone(), join_handle, stats: stats.clone(), - metadata, + metadata: metadata.clone(), named_path: Some(path.clone()), actor_id, }; @@ -642,9 +651,15 @@ impl ActorSystem { self.named_actor_paths .insert(path.as_str().to_string(), local_name.to_string()); - // Register in cluster + // Register in cluster with full details if let Some(cluster) = self.cluster.read().await.as_ref() { - cluster.register_named_actor(path.clone()).await; + if metadata.is_empty() { + cluster.register_named_actor(path.clone()).await; + } else { + cluster + .register_named_actor_full(path.clone(), actor_id, metadata) + .await; + } } // Create ActorRef @@ -847,6 +862,19 @@ impl ActorSystem { } } + /// Get detailed instances with actor_id and metadata + pub async fn get_named_instances_detailed( + &self, + path: &ActorPath, + ) -> Vec<(MemberInfo, Option)> { + let cluster_guard = self.cluster.read().await; + if let Some(cluster) = cluster_guard.as_ref() { + cluster.get_named_actor_instances_detailed(path).await + } else { + Vec::new() + } + } + /// Lookup named actor information pub async fn lookup_named(&self, path: &ActorPath) -> Option { let cluster_guard = self.cluster.read().await; @@ -1405,4 +1433,84 @@ impl Http2ServerHandler for SystemMessageHandler { // Export using global metrics registry metrics().export_prometheus(&system_metrics) } + + async fn cluster_members(&self) -> serde_json::Value { + let cluster_guard = self.cluster.read().await; + if let Some(cluster) = cluster_guard.as_ref() { + let members = cluster.all_members().await; + let result: Vec<_> = members + .iter() + .map(|m| { + serde_json::json!({ + "node_id": m.node_id.to_string(), + "addr": m.addr.to_string(), + "status": format!("{:?}", m.status), + }) + }) + .collect(); + serde_json::json!(result) + } else { + // Single node mode - return empty (no cluster) + serde_json::json!([{ + "node_id": self.node_id.to_string(), + "status": "Alive", + }]) + } + } + + async fn actors_list(&self, include_internal: bool) -> serde_json::Value { + let cluster_guard = self.cluster.read().await; + let all_named = if let Some(cluster) = cluster_guard.as_ref() { + cluster.all_named_actors().await + } else { + Vec::new() + }; + drop(cluster_guard); + + // Build actors list with detailed info + let mut actors = Vec::new(); + for info in all_named { + let path_str = info.path.as_str(); + + // Skip system/core + if path_str == "system/core" { + continue; + } + + // Check if this actor is on this node + if !info.instance_nodes.contains(&self.node_id) { + continue; + } + + let name = path_str.strip_prefix("actors/").unwrap_or(&path_str); + + // Skip internal actors unless requested + if !include_internal && name.starts_with('_') { + continue; + } + + let actor_type = if name.starts_with('_') { + "system" + } else { + "user" + }; + + // Get detailed instance info if available + let mut actor_json = serde_json::json!({ + "name": name, + "type": actor_type, + }); + + if let Some(instance) = info.get_instance(&self.node_id) { + actor_json["actor_id"] = serde_json::json!(instance.actor_id.to_string()); + for (k, v) in &instance.metadata { + actor_json[k] = serde_json::json!(v); + } + } + + actors.push(actor_json); + } + + serde_json::json!(actors) + } } diff --git a/crates/pulsing-actor/src/system_actor/messages.rs b/crates/pulsing-actor/src/system_actor/messages.rs index 677ecb67a..6e76eecc4 100644 --- a/crates/pulsing-actor/src/system_actor/messages.rs +++ b/crates/pulsing-actor/src/system_actor/messages.rs @@ -154,6 +154,9 @@ pub struct ActorInfo { pub uptime_secs: u64, /// Whether public pub public: bool, + /// Actor metadata (e.g., Python class info) + #[serde(default)] + pub metadata: std::collections::HashMap, } /// Actor status info @@ -207,6 +210,7 @@ mod tests { actor_type: "TestActor".to_string(), uptime_secs: 60, public: true, + metadata: std::collections::HashMap::new(), }; let json = serde_json::to_string(&info).unwrap(); assert!(json.contains("test")); diff --git a/crates/pulsing-actor/src/system_actor/mod.rs b/crates/pulsing-actor/src/system_actor/mod.rs index d08caa41a..9127ea5a8 100644 --- a/crates/pulsing-actor/src/system_actor/mod.rs +++ b/crates/pulsing-actor/src/system_actor/mod.rs @@ -138,6 +138,7 @@ impl ActorRegistry { actor_type: e.actor_type.clone(), uptime_secs: e.created_at.elapsed().as_secs(), public: e.public, + metadata: std::collections::HashMap::new(), // TODO: get from actor }) .collect() } @@ -149,6 +150,7 @@ impl ActorRegistry { actor_type: e.actor_type.clone(), uptime_secs: e.created_at.elapsed().as_secs(), public: e.public, + metadata: std::collections::HashMap::new(), // TODO: get from actor }) } } diff --git a/crates/pulsing-actor/src/tracing/mod.rs b/crates/pulsing-actor/src/tracing/mod.rs index fe5cf92de..f45486540 100644 --- a/crates/pulsing-actor/src/tracing/mod.rs +++ b/crates/pulsing-actor/src/tracing/mod.rs @@ -65,7 +65,11 @@ impl Default for TracingConfig { otlp_endpoint: None, sampling_ratio: 1.0, console_output: true, - log_filter: "info".to_string(), + // Filter out HTTP request/client logs and gossip logs by default to avoid span ID spam + // Users can enable with RUST_LOG=pulsing_actor::transport=debug,pulsing_actor::cluster=debug + log_filter: + "info,pulsing_actor::transport::http2=warn,pulsing_actor::cluster::gossip=warn" + .to_string(), } } } @@ -108,11 +112,13 @@ pub fn init_tracing(config: TracingConfig) -> anyhow::Result<()> { let tracer = provider.tracer("pulsing"); if config.console_output { + // Use compact format to avoid printing nested span contexts let fmt_layer = tracing_subscriber::fmt::layer() .with_target(true) .with_thread_ids(false) .with_file(false) - .with_line_number(false); + .with_line_number(false) + .compact(); // Compact format reduces span field verbosity let subscriber = tracing_subscriber::registry() .with(env_filter) diff --git a/crates/pulsing-actor/src/transport/http2/server.rs b/crates/pulsing-actor/src/transport/http2/server.rs index 6213eb926..9f1101621 100644 --- a/crates/pulsing-actor/src/transport/http2/server.rs +++ b/crates/pulsing-actor/src/transport/http2/server.rs @@ -82,6 +82,21 @@ pub trait Http2ServerHandler: Send + Sync + 'static { async fn prometheus_metrics(&self) -> String { String::new() } + + /// Get cluster members list (for CLI tools) + /// + /// Returns JSON array of member information. + async fn cluster_members(&self) -> serde_json::Value { + serde_json::json!([]) + } + + /// Get actors list on this node (for CLI tools) + /// + /// Returns JSON array of actor information. + async fn actors_list(&self, include_internal: bool) -> serde_json::Value { + let _ = include_internal; + serde_json::json!([]) + } } /// HTTP/2 Server @@ -349,6 +364,34 @@ impl Http2Server { .unwrap()); } + // Cluster members endpoint (for CLI tools) + if path == "/cluster/members" && method == Method::GET { + let members = handler.cluster_members().await; + let body = serde_json::to_vec(&members).unwrap_or_default(); + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/json") + .body(full_body(body)) + .unwrap()); + } + + // Actors list endpoint (for CLI tools) + if (path == "/actors" || path == "/actors/") && method == Method::GET { + // Check for ?all=true query parameter + let include_internal = req + .uri() + .query() + .map(|q| q.contains("all=true")) + .unwrap_or(false); + let actors = handler.actors_list(include_internal).await; + let body = serde_json::to_vec(&actors).unwrap_or_default(); + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/json") + .body(full_body(body)) + .unwrap()); + } + // Only POST for actor messages if method != Method::POST { return Ok(Response::builder() diff --git a/crates/pulsing-actor/tests/http2_transport_tests.rs b/crates/pulsing-actor/tests/http2_transport_tests.rs index 6ede3dda2..4c159ff22 100644 --- a/crates/pulsing-actor/tests/http2_transport_tests.rs +++ b/crates/pulsing-actor/tests/http2_transport_tests.rs @@ -84,6 +84,50 @@ impl Http2ServerHandler for TestHandler { "tell_count": self.counters.tell_count.load(Ordering::SeqCst), }) } + + async fn cluster_members(&self) -> serde_json::Value { + serde_json::json!([ + { + "node_id": "12345", + "addr": "127.0.0.1:8001", + "status": "Alive" + }, + { + "node_id": "67890", + "addr": "127.0.0.1:8002", + "status": "Alive" + } + ]) + } + + async fn actors_list(&self, include_internal: bool) -> serde_json::Value { + let mut actors = vec![ + serde_json::json!({ + "name": "counter-1", + "type": "user", + "actor_id": "12345:1", + "class": "Counter", + "module": "__main__" + }), + serde_json::json!({ + "name": "calculator", + "type": "user", + "actor_id": "12345:2", + "class": "Calculator", + "module": "__main__" + }), + ]; + + if include_internal { + actors.push(serde_json::json!({ + "name": "_python_actor_service", + "type": "system", + "actor_id": "12345:0" + })); + } + + serde_json::json!(actors) + } } // ============================================================================ @@ -1400,6 +1444,112 @@ mod tracing_tests { } } +// ============================================================================ +// REST API Endpoint Tests +// ============================================================================ + +mod rest_api_tests { + use super::*; + + /// Test /cluster/members endpoint + #[tokio::test] + async fn test_cluster_members_endpoint() { + let handler = Arc::new(TestHandler::new()); + let cancel = CancellationToken::new(); + + let _server = Http2Server::new( + "127.0.0.1:0".parse().unwrap(), + handler.clone(), + Http2Config::default(), + cancel.clone(), + ) + .await + .unwrap(); + + // Test cluster members endpoint + let members = handler.cluster_members().await; + assert!(members.is_array()); + let members_array = members.as_array().unwrap(); + assert_eq!(members_array.len(), 2); + + // Verify member structure + let member = &members_array[0]; + assert!(member.get("node_id").is_some()); + assert!(member.get("addr").is_some()); + assert!(member.get("status").is_some()); + + cancel.cancel(); + } + + /// Test /actors endpoint + #[tokio::test] + async fn test_actors_list_endpoint() { + let handler = Arc::new(TestHandler::new()); + let cancel = CancellationToken::new(); + + let _server = Http2Server::new( + "127.0.0.1:0".parse().unwrap(), + handler.clone(), + Http2Config::default(), + cancel.clone(), + ) + .await + .unwrap(); + + // Test actors list (user only) + let actors = handler.actors_list(false).await; + assert!(actors.is_array()); + let actors_array = actors.as_array().unwrap(); + assert_eq!(actors_array.len(), 2); // Only user actors + + // Verify actor structure + let actor = &actors_array[0]; + assert!(actor.get("name").is_some()); + assert!(actor.get("type").is_some()); + assert_eq!(actor.get("type").unwrap(), "user"); + + // Test actors list (include internal) + let all_actors = handler.actors_list(true).await; + let all_actors_array = all_actors.as_array().unwrap(); + assert_eq!(all_actors_array.len(), 3); // Includes system actor + + cancel.cancel(); + } + + /// Test actor metadata in actors list + #[tokio::test] + async fn test_actors_list_metadata() { + let handler = Arc::new(TestHandler::new()); + + // Test actors list has metadata + let actors = handler.actors_list(false).await; + let actors_array = actors.as_array().unwrap(); + let actor = &actors_array[0]; + + // Verify metadata fields + assert!(actor.get("actor_id").is_some()); + assert!(actor.get("class").is_some()); + assert!(actor.get("module").is_some()); + + // Verify values + assert_eq!(actor.get("class").unwrap(), "Counter"); + assert_eq!(actor.get("module").unwrap(), "__main__"); + } + + /// Test health check endpoint returns expected structure + #[tokio::test] + async fn test_health_check_endpoint() { + let handler = Arc::new(TestHandler::new()); + + let health = handler.health_check().await; + + assert!(health.get("status").is_some()); + assert_eq!(health.get("status").unwrap(), "healthy"); + assert!(health.get("ask_count").is_some()); + assert!(health.get("tell_count").is_some()); + } +} + // ============================================================================ // TLS Tests (requires `tls` feature) // ============================================================================ diff --git a/crates/pulsing-actor/tests/system_actor_tests.rs b/crates/pulsing-actor/tests/system_actor_tests.rs index 605b87011..4d4390733 100644 --- a/crates/pulsing-actor/tests/system_actor_tests.rs +++ b/crates/pulsing-actor/tests/system_actor_tests.rs @@ -166,6 +166,7 @@ fn test_actor_info_serialization() { actor_type: "TestActor".to_string(), uptime_secs: 60, public: true, + metadata: std::collections::HashMap::new(), }; let json = serde_json::to_string(&info).unwrap(); assert!(json.contains("test")); diff --git a/crates/pulsing-py/src/actor.rs b/crates/pulsing-py/src/actor.rs index 80d5118c8..e1bee4a5a 100644 --- a/crates/pulsing-py/src/actor.rs +++ b/crates/pulsing-py/src/actor.rs @@ -707,6 +707,43 @@ impl Actor for PythonActorWrapper { fn metadata(&self) -> std::collections::HashMap { Python::with_gil(|py| { let mut result = std::collections::HashMap::new(); + + // First, try to extract built-in Python class information + if let Ok(class) = self.handler.getattr(py, "__class__") { + // Get class name + if let Ok(name) = class.getattr(py, "__name__") { + if let Ok(name_str) = name.extract::(py) { + result.insert("python_class".to_string(), name_str); + } + } + + // Get module name + if let Ok(module) = class.getattr(py, "__module__") { + if let Ok(module_str) = module.extract::(py) { + result.insert("python_module".to_string(), module_str); + } + } + + // Get source file path + if let Ok(module_name) = class.getattr(py, "__module__") { + if let Ok(module_str) = module_name.extract::(py) { + // Try to get the module and its file path + if let Ok(sys) = py.import("sys") { + if let Ok(modules) = sys.getattr("modules") { + if let Ok(module_obj) = modules.get_item(module_str.as_str()) { + if let Ok(file_attr) = module_obj.getattr("__file__") { + if let Ok(file_path) = file_attr.extract::() { + result.insert("python_file".to_string(), file_path); + } + } + } + } + } + } + } + } + + // Then, check if the actor has custom metadata attribute if let Ok(metadata_attr) = self.handler.getattr(py, "metadata") { let bound = metadata_attr.bind(py); let value = if bound.is_callable() { @@ -724,6 +761,7 @@ impl Actor for PythonActorWrapper { } } } + result }) } @@ -983,10 +1021,76 @@ impl PyActorSystem { } }; + // Extract Python class metadata + let metadata = Python::with_gil(|py| { + let mut meta = std::collections::HashMap::new(); + + // Try to get original class info from _WrappedActor first + // This handles the @remote decorator case where we wrap user classes + let (module, qualname, file) = { + // Check for __original_module__, __original_qualname__, __original_file__ + let orig_module = handler + .getattr(py, "__original_module__") + .ok() + .and_then(|m| m.extract::(py).ok()); + let orig_qualname = handler + .getattr(py, "__original_qualname__") + .ok() + .and_then(|q| q.extract::(py).ok()); + let orig_file = handler + .getattr(py, "__original_file__") + .ok() + .and_then(|f| f.extract::(py).ok()); + + if orig_module.is_some() || orig_qualname.is_some() { + (orig_module, orig_qualname, orig_file) + } else { + // Fallback to regular class info + let class = handler + .getattr(py, "__class__") + .unwrap_or_else(|_| handler.clone_ref(py)); + + let module = class + .getattr(py, "__module__") + .ok() + .and_then(|m| m.extract::(py).ok()); + let qualname = class + .getattr(py, "__qualname__") + .ok() + .and_then(|q| q.extract::(py).ok()); + + // Get __file__ from module + let file = module.as_ref().and_then(|module_str| { + py.import("sys") + .ok() + .and_then(|sys| sys.getattr("modules").ok()) + .and_then(|modules| modules.get_item(module_str).ok()) + .and_then(|mod_obj| mod_obj.getattr("__file__").ok()) + .and_then(|f| f.extract::().ok()) + }); + + (module, qualname, file) + } + }; + + if let Some(m) = module { + meta.insert("module".to_string(), m); + } + if let Some(q) = qualname { + meta.insert("class".to_string(), q); + } + if let Some(f) = file { + meta.insert("file".to_string(), f); + } + + meta + }); + pyo3_async_runtimes::tokio::future_into_py(py, async move { let options = pulsing_actor::system::SpawnOptions::new() .public(public) - .supervision(supervision); + .supervision(supervision) + .metadata(metadata); let actor_ref = if matches!(policy, RestartPolicy::Never) { // handler is the instance @@ -1069,7 +1173,7 @@ impl PyActorSystem { self.inner.local_actor_names() } - /// Get all instances of a named actor across the cluster + /// Get all instances of a named actor across the cluster (with detailed info) fn get_named_instances<'py>( &self, py: Python<'py>, @@ -1079,19 +1183,45 @@ impl PyActorSystem { pyo3_async_runtimes::tokio::future_into_py(py, async move { let path = ActorPath::new(format!("actors/{}", name)).map_err(to_pyerr)?; - let instances: Vec = - system.get_named_instances(&path).await; - let result: Vec> = instances + let instances = system.get_named_instances_detailed(&path).await; + let result: Vec> = instances .into_iter() - .map(|m| { + .map(|(member, instance_opt)| { let mut map = std::collections::HashMap::new(); - map.insert("node_id".to_string(), m.node_id.to_string()); - map.insert("addr".to_string(), m.addr.to_string()); - map.insert("status".to_string(), format!("{:?}", m.status)); + map.insert( + "node_id".to_string(), + serde_json::Value::String(member.node_id.to_string()), + ); + map.insert( + "addr".to_string(), + serde_json::Value::String(member.addr.to_string()), + ); + map.insert( + "status".to_string(), + serde_json::Value::String(format!("{:?}", member.status)), + ); + + // Add detailed instance info if available + if let Some(inst) = instance_opt { + map.insert( + "actor_id".to_string(), + serde_json::Value::String(inst.actor_id.to_string()), + ); + // Add metadata fields + for (k, v) in inst.metadata { + map.insert(k, serde_json::Value::String(v)); + } + } + map }) .collect(); - Ok(result) + + Python::with_gil(|py| -> PyResult { + use pythonize::pythonize; + let pyobj = pythonize(py, &result)?; + Ok(pyobj.into()) + }) }) } @@ -1118,13 +1248,43 @@ impl PyActorSystem { info.instance_count(), )), ); - // Convert instances (HashSet) to list of node IDs as strings + // Convert instance_nodes (HashSet) to list of node IDs as strings let instances: Vec = info - .instances + .instance_nodes .iter() .map(|id| serde_json::Value::String(id.to_string())) .collect(); map.insert("instances".to_string(), serde_json::Value::Array(instances)); + + // Add detailed instance info if available + let detailed: Vec = info + .instances + .iter() + .map(|(node_id, inst)| { + let mut inst_map = serde_json::Map::new(); + inst_map.insert( + "node_id".to_string(), + serde_json::Value::String(node_id.to_string()), + ); + inst_map.insert( + "actor_id".to_string(), + serde_json::Value::String(inst.actor_id.to_string()), + ); + // Add metadata + for (k, v) in &inst.metadata { + inst_map + .insert(k.clone(), serde_json::Value::String(v.clone())); + } + serde_json::Value::Object(inst_map) + }) + .collect(); + if !detailed.is_empty() { + map.insert( + "detailed_instances".to_string(), + serde_json::Value::Array(detailed), + ); + } + map }) .collect(); diff --git a/docs/actor-list-guide.md b/docs/actor-list-guide.md new file mode 100644 index 000000000..7402674b5 --- /dev/null +++ b/docs/actor-list-guide.md @@ -0,0 +1,123 @@ +# Actor List 命令使用指南 + +`pulsing actor list` 命令用于列出当前 Actor 系统中的 actors。 + +## 基本用法 + +### 列出用户 actors(默认) + +```bash +pulsing actor list +``` + +默认情况下,只显示用户创建的命名 actors,不包括以 `_` 开头的系统内部 actors。 + +输出示例: + +``` +Name Type Uptime Code Path +--------------------------------------------------------------------------------------------------- +counter-1 user 5m 23s - +counter-2 user 5m 23s - +calculator user 5m 23s - + +Total: 3 actor(s) +``` + +### 列出所有 actors(包括系统 actors) + +```bash +pulsing actor list --all_actors True +``` + +包括系统内部的 actors: + +``` +Name Type Uptime Code Path +--------------------------------------------------------------------------------------------------- +counter-1 user 5m 23s - +_system_internal system 5m 30s - +_python_actor_service system 5m 30s - + +Total: 5 actor(s) +``` + +### JSON 输出格式 + +```bash +pulsing actor list --json True +``` + +以 JSON 格式输出,方便脚本处理: + +```json +[ + { + "name": "counter-1", + "type": "user", + "code_path": null, + "uptime": "5m 23s" + }, + { + "name": "counter-2", + "type": "user", + "code_path": null, + "uptime": "5m 23s" + } +] +``` + +## 在 Python 代码中使用 + +`pulsing actor list` CLI 命令需要在运行 actor system 的进程内调用。更常见的用法是直接在 Python 代码中使用: + +```python +import asyncio +from pulsing.actor import init, remote, get_system +from pulsing.cli.actor_list import list_actors_impl + + +@remote +class Counter: + def __init__(self): + self.count = 0 + + +async def main(): + # 初始化系统 + await init() + system = get_system() + + # 创建一些 actors + await Counter.remote(system, name="counter-1") + await Counter.remote(system, name="counter-2") + + # 列出 actors + await list_actors_impl(all_actors=False, output_format="table") + + # 或者直接使用底层 API + actor_names = system.local_actor_names() + user_actors = [n for n in actor_names if not n.startswith("_")] + print(f"User actors: {user_actors}") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## 字段说明 + +- **Name**: Actor 的名称 +- **Type**: Actor 类型 + - `user`: 用户创建的 actors + - `system`: 系统内部 actors +- **Uptime**: Actor 运行时间(当前为系统启动时间的近似值) +- **Code Path**: Python 类的代码路径(当前版本暂未实现,显示为 `-`) + +## 未来改进 + +- [ ] 显示每个 actor 的精确创建时间/运行时间 +- [ ] 显示 Python actor 的类型(类名)和代码路径 +- [ ] 显示 actor 的消息处理统计(处理数量、错误数等) +- [ ] 支持通过 `--seeds` 参数查询远程集群的 actors +- [ ] 支持过滤和搜索(按名称、类型等) diff --git a/docs/actor-list-implementation.md b/docs/actor-list-implementation.md new file mode 100644 index 000000000..b9ea890ef --- /dev/null +++ b/docs/actor-list-implementation.md @@ -0,0 +1,247 @@ +# Pulsing Actor List 完整实现总结 + +## ✅ 已完成功能 + +### 1. 本地查询模式 +在运行 actor system 的进程内查询 actors: + +```bash +# 在应用代码中 +from pulsing.cli.actor_list import list_actors_impl +await list_actors_impl() + +# 或在同一进程中作为 Python API +from pulsing.actor import get_system +names = get_system().local_actor_names() +``` + +**功能:** +- ✅ 列出用户 actors(默认) +- ✅ 列出所有 actors(`--all_actors True`) +- ✅ 显示 Python 类名(如 `__main__.Counter`) +- ✅ 显示代码路径(如 `/path/to/file.py`) +- ✅ 表格和 JSON 输出格式 + +### 2. 远程查询模式 +从外部连接到远程集群并查询 actors: + +```bash +# 查询整个集群 +pulsing actor list --list_seeds "127.0.0.1:8000" + +# 查询特定节点 +pulsing actor list --list_seeds "127.0.0.1:8000" --node_id 12345 + +# JSON 输出 +pulsing actor list --list_seeds "127.0.0.1:8000" --json True +``` + +**功能:** +- ✅ 通过 seeds 连接远程集群 +- ✅ 自动发现集群中的所有节点 +- ✅ 查询每个节点的 actors +- ✅ 显示节点状态和响应性 +- ✅ 支持查询特定节点(`--node_id`) + +## 实现架构 + +### 组件层次 + +``` +┌─────────────────────────────────────────┐ +│ CLI: pulsing actor list │ +│ (python/pulsing/cli/__main__.py) │ +└────────────┬────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────┐ +│ list_actors_command() │ +│ (python/pulsing/cli/actor_list.py) │ +│ - 解析参数 │ +│ - 选择本地/远程模式 │ +└────────────┬────────────────────────────┘ + │ + ┌──────┴──────┐ + ▼ ▼ +┌───────────┐ ┌──────────────────┐ +│ 本地模式 │ │ 远程模式 │ +│ │ │ │ +│ get_ │ │ create_actor_ │ +│ system() │ │ system(seeds) │ +│ │ │ │ +│ local_ │ │ all_named_ │ +│ actor_ │ │ actors() │ +│ names() │ │ │ +└───────────┘ └──────────────────┘ + │ │ + ▼ ▼ +┌─────────────────────────────────────────┐ +│ Metadata Registry │ +│ (_actor_metadata_registry) │ +│ - Python class name │ +│ - Source file path │ +│ - Module name │ +└─────────────────────────────────────────┘ +``` + +### 关键代码位置 + +1. **Rust 侧元信息提取** (`crates/pulsing-py/src/actor.rs`) + ```rust + impl Actor for PythonActorWrapper { + fn metadata(&self) -> HashMap { + // 自动提取 __class__, __module__, __file__ + } + } + ``` + +2. **Python 侧元信息注册** (`python/pulsing/actor/remote.py`) + ```python + def _register_actor_metadata(name: str, cls: type): + """在创建 actor 时注册类型信息""" + + def get_actor_metadata(name: str) -> dict[str, str] | None: + """查询 actor 的元信息""" + ``` + +3. **CLI 实现** (`python/pulsing/cli/actor_list.py`) + - `list_actors_impl()`: 核心查询逻辑 + - `_list_remote_node_actors()`: 远程节点查询 + - `_print_actors_output()`: 格式化输出 + +## 输出示例 + +### 本地查询(表格格式) +``` +Name Type Class Code Path +---------------------------------------------------------------------------------------------------------------------------------- +counter-1 user __main__.Counter /tmp/demo.py +counter-2 user __main__.Counter /tmp/demo.py +calculator user __main__.Calculator /tmp/demo.py + +Total: 3 actor(s) +``` + +### 远程查询(多节点) +``` +Connecting to cluster via seeds: ['127.0.0.1:9001']... +Found 2 nodes in cluster + +================================================================================ +Node 12345 (127.0.0.1:9001) - Status: Alive +================================================================================ + Node is responsive (ping: 1234567890) + Name Type Class Code Path + ---------------------------------------------------------------------------------------------------------------------------------- + service-a-1 user - - + service-a-2 user - - + + Total: 2 actor(s) + +================================================================================ +Node 67890 (127.0.0.1:9002) - Status: Alive +================================================================================ + Node is responsive (ping: 1234567891) + Name Type Class Code Path + ---------------------------------------------------------------------------------------------------------------------------------- + service-b-1 user - - + service-b-2 user - - + service-b-3 user - - + + Total: 3 actor(s) +``` + +## 使用场景 + +### 场景 1: 开发调试 +在应用内部快速查看创建了哪些 actors: + +```python +from pulsing.actor import init, remote, get_system +from pulsing.cli.actor_list import list_actors_impl + +await init() +# ... 创建 actors ... + +# 查看当前 actors +await list_actors_impl() +``` + +### 场景 2: 运维监控 +从外部查看生产集群的 actors 分布: + +```bash +# 查看整个集群 +pulsing actor list --list_seeds "prod-node-1:8000" + +# 查看特定节点 +pulsing actor list --list_seeds "prod-node-1:8000" --node_id 12345 + +# 导出为 JSON 供监控系统使用 +pulsing actor list --list_seeds "prod-node-1:8000" --json True > actors.json +``` + +### 场景 3: 集群诊断 +结合 `pulsing inspect` 使用,全面了解集群状态: + +```bash +# 先查看集群拓扑 +pulsing inspect --seeds "127.0.0.1:8000" + +# 再查看详细的 actor 列表 +pulsing actor list --list_seeds "127.0.0.1:8000" --all_actors True +``` + +## 局限性和未来改进 + +### 当前局限 + +1. **远程元信息缺失**:查询远程节点时,无法获取 Python 类名和代码路径 + - 原因:metadata 存储在本地进程内存中 + - 影响:远程查询只能看到 actor 名字 + +2. **Uptime 精度**:当前显示的是系统 uptime,不是单个 actor 的创建时间 + - 原因:ActorRegistry 存储创建时间,但 local_actor_names() 不返回 + +3. **性能**:查询大集群时需要逐个 ping 节点 + - 可能的优化:并发查询、缓存结果 + +### 建议改进(优先级从高到低) + +- [ ] **P1**: 在 Rust 的 ActorRegistry 中存储并返回 metadata + - 让远程查询也能看到类型信息 + +- [ ] **P2**: 添加每个 actor 的精确 uptime + - 修改 `local_actor_names()` 返回更详细信息 + +- [ ] **P2**: 添加消息统计(处理量、错误率等) + - 从 metrics 系统获取 + +- [ ] **P3**: 支持过滤和搜索 + - 按名称、类型、节点等过滤 + +- [ ] **P3**: 交互式模式(实时刷新) + - 类似 `top` 命令的体验 + +## 测试 + +```bash +# 运行测试 +cd /Users/reiase/workspace/Pulsing +PYTHONPATH=python pyenv exec python -m pytest tests/python/test_actor_list.py -v + +# 运行演示 +bash examples/bash/demo_actor_list.sh +bash examples/bash/demo_actor_list_remote.sh +``` + +## 相关文件 + +- `python/pulsing/cli/actor_list.py` - 核心实现 +- `python/pulsing/cli/__main__.py` - CLI 集成 +- `python/pulsing/actor/remote.py` - 元信息注册 +- `crates/pulsing-py/src/actor.rs` - Rust 元信息提取 +- `tests/python/test_actor_list.py` - 测试用例 +- `examples/bash/demo_actor_list.sh` - 本地演示 +- `examples/bash/demo_actor_list_remote.sh` - 远程演示 +- `docs/actor-list-guide.md` - 用户文档 diff --git a/examples/bash/README.md b/examples/bash/README.md new file mode 100644 index 000000000..3bf59e9bf --- /dev/null +++ b/examples/bash/README.md @@ -0,0 +1,107 @@ +# Bash 示例脚本 + +这个目录包含用于测试和演示 Pulsing 功能的 Bash 脚本。 + +## 脚本列表 + +### `demo_actor_list.sh` + +演示如何在应用中使用 actor list 功能查看当前运行的 actors。 + +**功能演示:** +- 在应用启动后查看 actor 列表 +- 默认模式:只显示用户创建的 actors +- `--all_actors` 模式:显示所有 actors(包括系统内部 actors) +- JSON 输出格式 +- 底层 API 使用(`system.local_actor_names()`) + +**使用方法:** + +```bash +cd examples/bash +./demo_actor_list.sh +``` + +或从项目根目录: + +```bash +bash examples/bash/demo_actor_list.sh +``` + +**输出示例:** + +``` +====================================================================== + Pulsing Actor List 演示 +====================================================================== + +Python: Python 3.12.11 + +运行演示... + +================================================================================ +演示:在应用中使用 pulsing actor list +================================================================================ + +1. 初始化 actor system... + ✓ 系统启动: 0.0.0.0:49724 + +2. 创建业务 actors... + ✓ 创建了 3 个 actors + +3. 使用 Python API 查看 actors: + ---------------------------------------------------------------------------- + 本地 actors: calculator, counter-1, counter-2 + +4. 使用 CLI 格式化输出(只显示用户 actors): + ---------------------------------------------------------------------------- +Name Type Uptime Code Path +----------------------------------------------------------------------------------------------------------- +counter-1 user 0s - +counter-2 user 0s - +calculator user 0s - + +Total: 3 actor(s) + +... +``` + +**重要说明:** + +`pulsing actor list` 是设计用于在**运行中的应用进程内**调用的管理功能,而不是独立的命令行工具。这是因为: + +1. Actor system 是进程本地的,需要在同一进程中才能访问 +2. 这种设计更适合集成到应用的管理接口中 +3. 对于外部查看远程集群,应使用 `pulsing inspect --seeds
` + +**在应用中集成:** + +```python +from pulsing.actor import init, get_system +from pulsing.cli.actor_list import list_actors_impl + +await init() +# ... 创建 actors ... + +# 在管理端点或 REPL 中调用 +await list_actors_impl(all_actors=False, output_format='table') +``` + +## 环境要求 + +- Python 3.11+ +- pyenv(脚本会自动使用 pyenv 的 Python) +- 已安装 Pulsing 包或设置正确的 PYTHONPATH +- Bash shell + +## 开发新脚本 + +创建新的演示脚本时,请遵循以下规范: + +1. **文件命名**:`demo_.sh` +2. **添加注释**:在脚本开头说明功能 +3. **错误处理**:使用 `set -e` 和适当的错误检查 +4. **使用 pyenv**:通过 `pyenv exec python` 调用 Python +5. **可执行权限**:`chmod +x