diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 5b5f4e0..ac5d1e3 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -127,6 +127,12 @@ def process_stats_results(stats: dict[str, Any]) -> dict[str, Any]: values.append(float(val["doubleVal"])) elif "stringVal" in val: values.append(val["stringVal"]) + elif "timestampVal" in val: + values.append( + datetime.fromisoformat( + val["timestampVal"].replace("Z", "+00:00") + ) + ) else: values.append(None) # Handle list value cells (like those from array_distinct) @@ -139,6 +145,12 @@ def process_stats_results(stats: dict[str, Any]) -> dict[str, Any]: list_values.append(float(list_val["doubleVal"])) elif "stringVal" in list_val: list_values.append(list_val["stringVal"]) + elif "timestampVal" in list_val: + list_values.append( + datetime.fromisoformat( + list_val["timestampVal"].replace("Z", "+00:00") + ) + ) values.append(list_values) else: values.append(None) diff --git a/tests/chronicle/test_stats.py b/tests/chronicle/test_stats.py index 08f68e5..b87f1a7 100644 --- a/tests/chronicle/test_stats.py +++ b/tests/chronicle/test_stats.py @@ -104,6 +104,93 @@ def test_get_stats_array_distinct(self) -> None: self.assertEqual(result["rows"][0]["array_col"], ["X1", "X2"]) self.assertEqual(result["rows"][1]["array_col"], ["Y1", "Y2"]) + def test_get_stats_timestamp_values(self) -> None: + """Test get_stats with timestampVal support.""" + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "stats": { + "results": [ + { + "column": "timestamp_col", + "values": [ + {"value": {"timestampVal": "2024-01-15T10:30:00Z"}}, + { + "value": { + "timestampVal": "2024-01-15T11:45:30.123Z" + } + }, + ], + }, + { + "column": "event_count", + "values": [ + {"value": {"int64Val": "100"}}, + {"value": {"int64Val": "200"}}, + ], + }, + ] + } + } + self.mock_session.get.return_value = mock_response + + result = get_stats( + self.mock_client, "test query", self.start_time, self.end_time + ) + + self.assertEqual(result["total_rows"], 2) + self.assertEqual(result["columns"], ["timestamp_col", "event_count"]) + self.assertEqual(len(result["rows"]), 2) + self.assertIsInstance(result["rows"][0]["timestamp_col"], datetime) + self.assertIsInstance(result["rows"][1]["timestamp_col"], datetime) + self.assertEqual(result["rows"][0]["event_count"], 100) + self.assertEqual(result["rows"][1]["event_count"], 200) + + def test_get_stats_timestamp_in_list(self) -> None: + """Test get_stats with timestampVal in list values.""" + mock_response = mock.MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "stats": { + "results": [ + { + "column": "timestamp_array", + "values": [ + { + "list": { + "values": [ + { + "timestampVal": ( + "2024-01-15T10:00:00Z" + ) + }, + { + "timestampVal": ( + "2024-01-15T11:00:00Z" + ) + }, + ] + } + }, + ], + } + ] + } + } + self.mock_session.get.return_value = mock_response + + result = get_stats( + self.mock_client, "test query", self.start_time, self.end_time + ) + + self.assertEqual(result["total_rows"], 1) + self.assertEqual(result["columns"], ["timestamp_array"]) + self.assertEqual(len(result["rows"]), 1) + self.assertIsInstance(result["rows"][0]["timestamp_array"], list) + self.assertEqual(len(result["rows"][0]["timestamp_array"]), 2) + self.assertIsInstance(result["rows"][0]["timestamp_array"][0], datetime) + self.assertIsInstance(result["rows"][0]["timestamp_array"][1], datetime) + def test_process_stats_results_empty(self) -> None: """Test processing empty stats results.""" empty_stats: Dict[str, Any] = {}