-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
270 lines (229 loc) · 8.72 KB
/
server.py
File metadata and controls
270 lines (229 loc) · 8.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import os
import json
import pandas as pd
from fastapi import FastAPI, UploadFile, HTTPException, WebSocket, WebSocketDisconnect, Query
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from classify import classify, classify_log_detailed
# Pydantic models for request validation
class LogClassificationRequest(BaseModel):
source: str
log_message: str
app = FastAPI(
title="Log Classification API",
description="Hybrid log classification system with real-time streaming and analytics",
version="1.0.0"
)
# Enable CORS for web applications
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"message": "Log Classification API",
"version": "1.0.0",
"endpoints": {
"classify_csv": "/classify/",
"classify_single": "/classify/single",
"analytics": "/analytics/",
"websocket": "/ws/classify"
}
}
@app.post("/classify/")
async def classify_logs(file: UploadFile):
"""
Classify logs from a CSV file.
The CSV file must contain 'source' and 'log_message' columns.
Returns a CSV file with an additional 'target_label' column.
"""
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be a CSV file.")
try:
# Read the uploaded CSV
df = pd.read_csv(file.file)
if "source" not in df.columns or "log_message" not in df.columns:
raise HTTPException(
status_code=400,
detail="CSV must contain 'source' and 'log_message' columns."
)
# Perform classification
df["target_label"] = classify(list(zip(df["source"], df["log_message"])))
# Save the modified file
output_file = "resources/output.csv"
os.makedirs("resources", exist_ok=True)
df.to_csv(output_file, index=False)
return FileResponse(
output_file,
media_type='text/csv',
filename=f"classified_{file.filename}"
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")
finally:
file.file.close()
@app.post("/classify/single")
async def classify_single_log(request: LogClassificationRequest):
"""
Classify a single log message with detailed information.
Request body should contain:
{
"source": str,
"log_message": str
}
Returns classification with confidence score and method used.
"""
try:
result = classify_log_detailed(request.source, request.log_message)
result["source"] = request.source
result["log_message"] = request.log_message
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error classifying log: {str(e)}")
@app.websocket("/ws/classify")
async def websocket_classify(websocket: WebSocket):
"""
WebSocket endpoint for real-time log classification.
Send JSON messages in the format:
{
"source": str,
"log_message": str
}
Receive JSON responses in the format:
{
"source": str,
"log_message": str,
"target_label": str,
"confidence": float,
"method": str,
"timestamp": str
}
"""
await websocket.accept()
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message = json.loads(data)
source = message.get("source")
log_message = message.get("log_message")
if not source or not log_message:
await websocket.send_json({
"error": "Message must contain 'source' and 'log_message' fields."
})
continue
# Classify the log
result = classify_log_detailed(source, log_message)
# Prepare response
response = {
"source": source,
"log_message": log_message,
"target_label": result["target_label"],
"confidence": result["confidence"],
"method": result["method"],
"timestamp": datetime.now().isoformat()
}
# Send response back
await websocket.send_json(response)
except WebSocketDisconnect:
print("Client disconnected")
except json.JSONDecodeError:
await websocket.send_json({"error": "Invalid JSON format"})
except Exception as e:
await websocket.send_json({"error": f"Error: {str(e)}"})
@app.get("/analytics/")
async def get_analytics(file: Optional[str] = Query(None, description="Path to CSV file to analyze")):
"""
Get analytics and statistics about classified logs.
If a file path is provided, analyzes that file. Otherwise, analyzes the default output file.
Returns comprehensive statistics including distributions, trends, and insights.
"""
try:
# Determine which file to analyze
if file:
csv_file = file
else:
csv_file = "resources/output.csv"
if not os.path.exists(csv_file):
raise HTTPException(
status_code=404,
detail=f"File not found: {csv_file}. Please classify a CSV file first."
)
# Read the CSV file
df = pd.read_csv(csv_file)
# Validate required columns
if "target_label" not in df.columns:
raise HTTPException(
status_code=400,
detail="File does not contain 'target_label' column. Please classify the file first."
)
# Calculate statistics
total_logs = len(df)
# Distribution by label
label_distribution = df["target_label"].value_counts().to_dict()
label_percentages = (df["target_label"].value_counts(normalize=True) * 100).round(2).to_dict()
# Distribution by source (if available)
source_distribution = {}
source_percentages = {}
if "source" in df.columns:
source_distribution = df["source"].value_counts().to_dict()
source_percentages = (df["source"].value_counts(normalize=True) * 100).round(2).to_dict()
# Top labels
top_labels = df["target_label"].value_counts().head(10).to_dict()
# Most common sources
top_sources = {}
if "source" in df.columns:
top_sources = df["source"].value_counts().head(10).to_dict()
# Statistics summary
unique_labels = df["target_label"].nunique()
unique_sources = df["source"].nunique() if "source" in df.columns else 0
# Calculate label statistics
label_stats = {}
for label in df["target_label"].unique():
count = (df["target_label"] == label).sum()
percentage = (count / total_logs) * 100
label_stats[label] = {
"count": int(count),
"percentage": round(percentage, 2)
}
# Prepare response
analytics = {
"summary": {
"total_logs": int(total_logs),
"unique_labels": int(unique_labels),
"unique_sources": int(unique_sources),
"file_analyzed": csv_file
},
"distributions": {
"by_label": {
"counts": {k: int(v) for k, v in label_distribution.items()},
"percentages": label_percentages
},
"by_source": {
"counts": {k: int(v) for k, v in source_distribution.items()},
"percentages": source_percentages
} if source_distribution else {}
},
"top_items": {
"top_labels": {k: int(v) for k, v in top_labels.items()},
"top_sources": {k: int(v) for k, v in top_sources.items()} if top_sources else {}
},
"label_statistics": label_stats
}
return JSONResponse(content=analytics)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error analyzing file: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)