1+ use std:: pin:: Pin ;
12use std:: sync:: Arc ;
23use std:: time:: Duration ;
34
45use crate :: host:: { Host , HostApi } ;
56use crate :: oci:: { self , OciConfig } ;
67use crate :: plugin:: HostPlugin ;
78use anyhow:: Context as _;
8- use futures:: StreamExt as _;
9+ use futures:: { FutureExt , StreamExt as _} ;
910use opentelemetry:: KeyValue ;
1011use opentelemetry_sdk:: resource:: { Resource , ResourceBuilder } ;
1112use opentelemetry_semantic_conventions:: resource;
@@ -107,9 +108,31 @@ impl ClusterHost {
107108 }
108109}
109110
111+ pub struct RunningHost {
112+ host : Arc < Host > ,
113+ run_loop : Pin < Box < dyn Future < Output = anyhow:: Result < ( ) > > + Send + ' static > > ,
114+ }
115+
116+ impl RunningHost {
117+ pub fn host ( & self ) -> & Host {
118+ & self . host
119+ }
120+ }
121+
122+ impl std:: future:: Future for RunningHost {
123+ type Output = anyhow:: Result < ( ) > ;
124+
125+ fn poll (
126+ mut self : Pin < & mut Self > ,
127+ cx : & mut std:: task:: Context < ' _ > ,
128+ ) -> std:: task:: Poll < Self :: Output > {
129+ self . as_mut ( ) . get_mut ( ) . run_loop . poll_unpin ( cx)
130+ }
131+ }
132+
110133pub async fn run_cluster_host (
111134 cluster_host : ClusterHost ,
112- ) -> anyhow:: Result < impl Future < Output = anyhow :: Result < ( ) > > , anyhow:: Error > {
135+ ) -> anyhow:: Result < RunningHost , anyhow:: Error > {
113136 let ( one_shot_tx, mut one_shot_rx) = oneshot:: channel ( ) ;
114137 let nats_client = cluster_host. nats_client . clone ( ) ;
115138 let host = cluster_host
@@ -120,7 +143,7 @@ pub async fn run_cluster_host(
120143
121144 let heartbeat_interval = cluster_host. heartbeat_interval ;
122145 let host_id = host. id ( ) . to_string ( ) ;
123- let host = host. clone ( ) ;
146+ let host_clone = host. clone ( ) ;
124147
125148 let task = tokio:: task:: spawn ( async move {
126149 let host_subject = host_subject ( host_id. as_ref ( ) ) ;
@@ -138,18 +161,18 @@ pub async fn run_cluster_host(
138161 // Shutdown signal
139162 _ = & mut one_shot_rx => {
140163 api_subscription. unsubscribe( ) . await . context( "failed to unsubscribe from API requests" ) ?;
141- return host . stop( ) . await . context( "failed to stop host" ) ;
164+ return host_clone . stop( ) . await . context( "failed to stop host" ) ;
142165 }
143166 // Send heartbeat
144167 _ = heartbeat_timer. tick( ) => {
145- let heartbeat = host_heartbeat( & host ) . await ?;
168+ let heartbeat = host_heartbeat( & host_clone ) . await ?;
146169 let heartbeat_bytes = serde_json:: to_vec( & heartbeat)
147170 . context( "failed to serialize heartbeat" ) ?;
148171 nats_client. publish( heartbeat_subject. clone( ) , heartbeat_bytes. into( ) ) . await . context( "failed to publish heartbeat" ) ?;
149172 }
150173 // Handle API requests
151174 Some ( msg) = api_subscription. next( ) => {
152- let response = handle_command( host . as_ref( ) , & msg) . await ;
175+ let response = handle_command( host_clone . as_ref( ) , & msg) . await ;
153176 match response {
154177 Ok ( resp_bytes) => {
155178 if let Some ( reply_to) = msg. reply {
@@ -165,9 +188,12 @@ pub async fn run_cluster_host(
165188 }
166189 } ) ;
167190
168- Ok ( async move {
169- let _ = one_shot_tx. send ( ( ) ) ;
170- task. await ?
191+ Ok ( RunningHost {
192+ host,
193+ run_loop : Box :: pin ( async move {
194+ let _ = one_shot_tx. send ( ( ) ) ;
195+ task. await ?
196+ } ) ,
171197 } )
172198}
173199
0 commit comments