Skip to content

Commit 8ebcac9

Browse files
authored
Merge pull request #112 from G-Core/feat/multi_logger
multi logger
2 parents 197f776 + 763e3ae commit 8ebcac9

4 files changed

Lines changed: 149 additions & 15 deletions

File tree

crates/http-service/src/executor/http.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ mod tests {
187187
use http_body_util::Empty;
188188
use key_value_store::ReadStats;
189189
use runtime::app::{KvStoreOption, SecretOption, Status};
190-
use runtime::logger::{Logger, NullAppender};
190+
use runtime::logger::Logger;
191191
use runtime::service::ServiceBuilder;
192192
use runtime::util::stats::CdnPhase;
193193
use runtime::{
@@ -249,7 +249,7 @@ mod tests {
249249
type BackendConnector = FastEdgeConnector;
250250

251251
fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger {
252-
Logger::new(NullAppender)
252+
Logger::new()
253253
}
254254

255255
fn backend(&self) -> Backend<FastEdgeConnector> {

crates/runtime/src/logger.rs

Lines changed: 144 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,173 @@ use wasmtime_wasi_io::streams::StreamResult;
1717
#[derive(Clone)]
1818
pub struct Logger {
1919
properties: HashMap<String, String>,
20-
appender: Arc<dyn AppenderBuilder + Send + Sync>,
20+
appenders: Vec<Arc<dyn AppenderBuilder + Send + Sync>>,
2121
}
2222

2323
pub trait AppenderBuilder {
2424
fn build(&self, properties: HashMap<String, String>) -> Box<dyn AsyncWrite + Send + Sync>;
2525
}
2626

27+
/// Fans out writes sequentially to multiple [`AsyncWrite`] sinks.
28+
struct MultiWriter {
29+
writers: Vec<Box<dyn AsyncWrite + Send + Sync>>,
30+
write_current: usize,
31+
write_n: usize,
32+
flush_current: usize,
33+
shutdown_current: usize,
34+
}
35+
36+
impl MultiWriter {
37+
fn new(writers: Vec<Box<dyn AsyncWrite + Send + Sync>>) -> Self {
38+
Self {
39+
writers,
40+
write_current: 0,
41+
write_n: 0,
42+
flush_current: 0,
43+
shutdown_current: 0,
44+
}
45+
}
46+
}
47+
48+
impl AsyncWrite for MultiWriter {
49+
fn poll_write(
50+
self: Pin<&mut Self>,
51+
cx: &mut Context<'_>,
52+
buf: &[u8],
53+
) -> Poll<Result<usize, std::io::Error>> {
54+
let this = self.get_mut();
55+
56+
if this.writers.is_empty() {
57+
return Poll::Ready(Ok(buf.len()));
58+
}
59+
60+
// First writer determines how many bytes are accepted.
61+
if this.write_current == 0 {
62+
// SAFETY: writers are heap-allocated (Box) and won't move.
63+
match unsafe { Pin::new_unchecked(&mut *this.writers[0]) }.poll_write(cx, buf) {
64+
Poll::Ready(Ok(n)) => {
65+
this.write_n = n;
66+
this.write_current = 1;
67+
}
68+
Poll::Ready(Err(e)) => {
69+
tracing::warn!(cause=?e, "MultiWriter: appender 0 write error");
70+
this.write_n = buf.len();
71+
this.write_current = 1;
72+
}
73+
Poll::Pending => return Poll::Pending,
74+
}
75+
}
76+
77+
// Remaining writers receive exactly write_n bytes.
78+
while this.write_current < this.writers.len() {
79+
let idx = this.write_current;
80+
let n = this.write_n;
81+
// SAFETY: writers are heap-allocated (Box) and won't move.
82+
match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_write(cx, &buf[..n]) {
83+
Poll::Ready(Ok(_)) => this.write_current += 1,
84+
Poll::Ready(Err(e)) => {
85+
tracing::warn!(cause=?e, "MultiWriter: appender {idx} write error");
86+
this.write_current += 1;
87+
}
88+
Poll::Pending => return Poll::Pending,
89+
}
90+
}
91+
92+
let n = this.write_n;
93+
this.write_current = 0;
94+
Poll::Ready(Ok(n))
95+
}
96+
97+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
98+
let this = self.get_mut();
99+
100+
while this.flush_current < this.writers.len() {
101+
let idx = this.flush_current;
102+
// SAFETY: writers are heap-allocated (Box) and won't move.
103+
match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_flush(cx) {
104+
Poll::Ready(_) => this.flush_current += 1,
105+
Poll::Pending => return Poll::Pending,
106+
}
107+
}
108+
109+
this.flush_current = 0;
110+
Poll::Ready(Ok(()))
111+
}
112+
113+
fn poll_shutdown(
114+
self: Pin<&mut Self>,
115+
cx: &mut Context<'_>,
116+
) -> Poll<Result<(), std::io::Error>> {
117+
let this = self.get_mut();
118+
119+
while this.shutdown_current < this.writers.len() {
120+
let idx = this.shutdown_current;
121+
// SAFETY: writers are heap-allocated (Box) and won't move.
122+
match unsafe { Pin::new_unchecked(&mut *this.writers[idx]) }.poll_shutdown(cx) {
123+
Poll::Ready(_) => this.shutdown_current += 1,
124+
Poll::Pending => return Poll::Pending,
125+
}
126+
}
127+
128+
this.shutdown_current = 0;
129+
Poll::Ready(Ok(()))
130+
}
131+
}
132+
27133
impl IsTerminal for Logger {
28134
fn is_terminal(&self) -> bool {
29135
false
30136
}
31137
}
32138

33-
#[async_trait]
34139
impl StdoutStream for Logger {
35140
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
36-
self.appender.build(self.properties.clone())
141+
let writers = self
142+
.appenders
143+
.iter()
144+
.map(|a| a.build(self.properties.clone()))
145+
.collect();
146+
Box::new(MultiWriter::new(writers))
37147
}
38148
}
39149

40150
impl Logger {
41-
pub fn new<S: AppenderBuilder + Sync + Send + 'static>(sink: S) -> Self {
151+
pub fn new() -> Self {
42152
Self {
43153
properties: Default::default(),
44-
appender: Arc::new(sink),
154+
appenders: vec![],
45155
}
46156
}
47157

158+
/// Builder-style method to attach an additional appender.
159+
pub fn with_appender<S: AppenderBuilder + Sync + Send + 'static>(mut self, sink: S) -> Self {
160+
self.appenders.push(Arc::new(sink));
161+
self
162+
}
163+
164+
/// Attach an additional appender to an existing logger.
165+
pub fn add_appender<S: AppenderBuilder + Sync + Send + 'static>(&mut self, sink: S) {
166+
self.appenders.push(Arc::new(sink));
167+
}
168+
48169
pub async fn write_msg(&self, msg: String) {
49-
if let Err(error) = Box::into_pin(self.async_stream())
50-
.write_all(msg.as_bytes())
51-
.await
52-
{
53-
tracing::warn!(cause=?error, "write_msg");
170+
let bytes = msg.as_bytes();
171+
for appender in &self.appenders {
172+
if let Err(error) = Box::into_pin(appender.build(self.properties.clone()))
173+
.write_all(bytes)
174+
.await
175+
{
176+
tracing::warn!(cause=?error, "write_msg");
177+
}
178+
}
179+
}
180+
}
181+
182+
impl Default for Logger {
183+
fn default() -> Self {
184+
Self {
185+
properties: Default::default(),
186+
appenders: vec![Arc::new(NullAppender)],
54187
}
55188
}
56189
}
@@ -61,7 +194,7 @@ impl Extend<(String, String)> for Logger {
61194
}
62195
}
63196

64-
pub struct NullAppender;
197+
struct NullAppender;
65198

66199
impl AppenderBuilder for NullAppender {
67200
fn build(&self, _fields: HashMap<String, String>) -> Box<dyn AsyncWrite + Send + Sync> {

release.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
pre-release-hook = ["git", "cliff", "-o", "CHANGELOG.md", "--tag", "{{version}}" ]
2-
allow-branch = ["releases/**"]
2+
#allow-branch = ["releases/**"]
33
publish = false

src/context.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ impl PreCompiledLoader<u64> for Context {
5151
impl ContextT for Context {
5252
type BackendConnector = HttpsConnector<HttpConnector>;
5353
fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger {
54-
Logger::new(Console::default())
54+
let logger = Logger::new();
55+
logger.with_appender(Console::default())
5556
}
5657

5758
fn backend(&self) -> Backend<HttpsConnector<HttpConnector>> {

0 commit comments

Comments
 (0)