diff --git a/scrapegraphai/graphs/markdownify_graph.py b/scrapegraphai/graphs/markdownify_graph.py index 78d33b12..7decb259 100644 --- a/scrapegraphai/graphs/markdownify_graph.py +++ b/scrapegraphai/graphs/markdownify_graph.py @@ -64,9 +64,7 @@ def __init__( graph_name="Markdownify", ) - def execute( - self, initial_state: Dict - ) -> Tuple[Dict, List[Dict]]: + def execute(self, initial_state: Dict) -> Tuple[Dict, List[Dict]]: """ Execute the markdownify graph. @@ -80,4 +78,4 @@ def execute( - Dictionary with the markdown result in the "markdown" key - List of execution logs """ - return super().execute(initial_state) \ No newline at end of file + return super().execute(initial_state) diff --git a/scrapegraphai/helpers/models_tokens.py b/scrapegraphai/helpers/models_tokens.py index c700a6dc..9d531e9a 100644 --- a/scrapegraphai/helpers/models_tokens.py +++ b/scrapegraphai/helpers/models_tokens.py @@ -32,7 +32,6 @@ "o1-preview": 128000, "o1-mini": 128000, "o1": 128000, - "gpt-4.5-preview": 128000, "o3-mini": 200000, }, "azure_openai": { diff --git a/scrapegraphai/models/xai.py b/scrapegraphai/models/xai.py index 86969483..64fc79b7 100644 --- a/scrapegraphai/models/xai.py +++ b/scrapegraphai/models/xai.py @@ -1,6 +1,7 @@ """ xAI Grok Module """ + from langchain_openai import ChatOpenAI @@ -19,4 +20,4 @@ def __init__(self, **llm_config): llm_config["openai_api_key"] = llm_config.pop("api_key") llm_config["openai_api_base"] = "https://api.x.ai/v1" - super().__init__(**llm_config) \ No newline at end of file + super().__init__(**llm_config) diff --git a/scrapegraphai/nodes/markdownify_node.py b/scrapegraphai/nodes/markdownify_node.py index 2119908a..da1407a4 100644 --- a/scrapegraphai/nodes/markdownify_node.py +++ b/scrapegraphai/nodes/markdownify_node.py @@ -64,4 +64,4 @@ def execute(self, state: dict) -> dict: # Update state with markdown content state.update({self.output[0]: markdown_content}) - return state \ No newline at end of file + return state diff --git a/scrapegraphai/utils/code_error_analysis.py b/scrapegraphai/utils/code_error_analysis.py index d2c6a42d..7a496f42 100644 --- a/scrapegraphai/utils/code_error_analysis.py +++ b/scrapegraphai/utils/code_error_analysis.py @@ -14,9 +14,9 @@ import json from typing import Any, Dict, Optional -from pydantic import BaseModel, Field, validator -from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import PromptTemplate +from pydantic import BaseModel, Field, validator from ..prompts import ( TEMPLATE_EXECUTION_ANALYSIS, @@ -28,20 +28,25 @@ class AnalysisError(Exception): """Base exception for code analysis errors.""" + pass class InvalidStateError(AnalysisError): """Exception raised when state dictionary is missing required keys.""" + pass class CodeAnalysisState(BaseModel): """Base model for code analysis state validation.""" + generated_code: str = Field(..., description="The generated code to analyze") - errors: Dict[str, Any] = Field(..., description="Dictionary containing error information") + errors: Dict[str, Any] = Field( + ..., description="Dictionary containing error information" + ) - @validator('errors') + @validator("errors") def validate_errors(cls, v): """Ensure errors dictionary has expected structure.""" if not isinstance(v, dict): @@ -51,28 +56,30 @@ def validate_errors(cls, v): class ExecutionAnalysisState(CodeAnalysisState): """Model for execution analysis state validation.""" + html_code: Optional[str] = Field(None, description="HTML code if available") html_analysis: Optional[str] = Field(None, description="Analysis of HTML code") - @validator('errors') + @validator("errors") def validate_execution_errors(cls, v): """Ensure errors dictionary contains execution key.""" super().validate_errors(v) - if 'execution' not in v: + if "execution" not in v: raise ValueError("errors dictionary must contain 'execution' key") return v class ValidationAnalysisState(CodeAnalysisState): """Model for validation analysis state validation.""" + json_schema: Dict[str, Any] = Field(..., description="JSON schema for validation") execution_result: Any = Field(..., description="Result of code execution") - @validator('errors') + @validator("errors") def validate_validation_errors(cls, v): """Ensure errors dictionary contains validation key.""" super().validate_errors(v) - if 'validation' not in v: + if "validation" not in v: raise ValueError("errors dictionary must contain 'validation' key") return v @@ -80,10 +87,10 @@ def validate_validation_errors(cls, v): def get_optimal_analysis_template(error_type: str) -> str: """ Returns the optimal prompt template based on the error type. - + Args: error_type (str): Type of error to analyze. - + Returns: str: The prompt template text. """ @@ -106,10 +113,10 @@ def syntax_focused_analysis(state: Dict[str, Any], llm_model) -> str: Returns: str: The result of the syntax error analysis. - + Raises: InvalidStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'print("Hello World")', @@ -121,26 +128,28 @@ def syntax_focused_analysis(state: Dict[str, Any], llm_model) -> str: # Validate state using Pydantic model validated_state = CodeAnalysisState( generated_code=state.get("generated_code", ""), - errors=state.get("errors", {}) + errors=state.get("errors", {}), ) - + # Check if syntax errors exist if "syntax" not in validated_state.errors: raise InvalidStateError("No syntax errors found in state dictionary") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("syntax"), - input_variables=["generated_code", "errors"] + input_variables=["generated_code", "errors"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state - return chain.invoke({ - "generated_code": validated_state.generated_code, - "errors": validated_state.errors["syntax"] - }) - + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "errors": validated_state.errors["syntax"], + } + ) + except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -157,10 +166,10 @@ def execution_focused_analysis(state: Dict[str, Any], llm_model) -> str: Returns: str: The result of the execution error analysis. - + Raises: InvalidStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'print(x)', @@ -176,24 +185,26 @@ def execution_focused_analysis(state: Dict[str, Any], llm_model) -> str: generated_code=state.get("generated_code", ""), errors=state.get("errors", {}), html_code=state.get("html_code", ""), - html_analysis=state.get("html_analysis", "") + html_analysis=state.get("html_analysis", ""), ) - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("execution"), input_variables=["generated_code", "errors", "html_code", "html_analysis"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state - return chain.invoke({ - "generated_code": validated_state.generated_code, - "errors": validated_state.errors["execution"], - "html_code": validated_state.html_code, - "html_analysis": validated_state.html_analysis, - }) - + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "errors": validated_state.errors["execution"], + "html_code": validated_state.html_code, + "html_analysis": validated_state.html_analysis, + } + ) + except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -211,10 +222,10 @@ def validation_focused_analysis(state: Dict[str, Any], llm_model) -> str: Returns: str: The result of the validation error analysis. - + Raises: InvalidStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'return {"name": "John"}', @@ -230,24 +241,31 @@ def validation_focused_analysis(state: Dict[str, Any], llm_model) -> str: generated_code=state.get("generated_code", ""), errors=state.get("errors", {}), json_schema=state.get("json_schema", {}), - execution_result=state.get("execution_result", {}) + execution_result=state.get("execution_result", {}), ) - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("validation"), - input_variables=["generated_code", "errors", "json_schema", "execution_result"], + input_variables=[ + "generated_code", + "errors", + "json_schema", + "execution_result", + ], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state - return chain.invoke({ - "generated_code": validated_state.generated_code, - "errors": validated_state.errors["validation"], - "json_schema": validated_state.json_schema, - "execution_result": validated_state.execution_result, - }) - + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "errors": validated_state.errors["validation"], + "json_schema": validated_state.json_schema, + "execution_result": validated_state.execution_result, + } + ) + except KeyError as e: raise InvalidStateError(f"Missing required key in state dictionary: {e}") except Exception as e: @@ -268,10 +286,10 @@ def semantic_focused_analysis( Returns: str: The result of the semantic error analysis. - + Raises: InvalidStateError: If state or comparison_result is missing required keys. - + Example: >>> state = { 'generated_code': 'def add(a, b): return a + b' @@ -286,30 +304,32 @@ def semantic_focused_analysis( # Validate state using Pydantic model validated_state = CodeAnalysisState( generated_code=state.get("generated_code", ""), - errors=state.get("errors", {}) + errors=state.get("errors", {}), ) - + # Validate comparison_result if "differences" not in comparison_result: raise InvalidStateError("comparison_result missing 'differences' key") if "explanation" not in comparison_result: raise InvalidStateError("comparison_result missing 'explanation' key") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_analysis_template("semantic"), input_variables=["generated_code", "differences", "explanation"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated inputs - return chain.invoke({ - "generated_code": validated_state.generated_code, - "differences": json.dumps(comparison_result["differences"], indent=2), - "explanation": comparison_result["explanation"], - }) - + return chain.invoke( + { + "generated_code": validated_state.generated_code, + "differences": json.dumps(comparison_result["differences"], indent=2), + "explanation": comparison_result["explanation"], + } + ) + except KeyError as e: raise InvalidStateError(f"Missing required key: {e}") except Exception as e: - raise AnalysisError(f"Semantic analysis failed: {str(e)}") \ No newline at end of file + raise AnalysisError(f"Semantic analysis failed: {str(e)}") diff --git a/scrapegraphai/utils/code_error_correction.py b/scrapegraphai/utils/code_error_correction.py index 9727c9ad..ac969f87 100644 --- a/scrapegraphai/utils/code_error_correction.py +++ b/scrapegraphai/utils/code_error_correction.py @@ -11,12 +11,12 @@ """ import json -from typing import Any, Dict, Optional from functools import lru_cache +from typing import Any, Dict -from pydantic import BaseModel, Field, validator -from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import PromptTemplate +from pydantic import BaseModel, Field from ..prompts import ( TEMPLATE_EXECUTION_CODE_GENERATION, @@ -28,29 +28,36 @@ class CodeGenerationError(Exception): """Base exception for code generation errors.""" + pass class InvalidCorrectionStateError(CodeGenerationError): """Exception raised when state dictionary is missing required keys.""" + pass class CorrectionState(BaseModel): """Base model for code correction state validation.""" - generated_code: str = Field(..., description="The original generated code to correct") - + + generated_code: str = Field( + ..., description="The original generated code to correct" + ) + class Config: extra = "allow" class ValidationCorrectionState(CorrectionState): """Model for validation correction state validation.""" + json_schema: Dict[str, Any] = Field(..., description="JSON schema for validation") class SemanticCorrectionState(CorrectionState): """Model for semantic correction state validation.""" + execution_result: Any = Field(..., description="Result of code execution") reference_answer: Any = Field(..., description="Reference answer for comparison") @@ -60,10 +67,10 @@ def get_optimal_correction_template(error_type: str) -> str: """ Returns the optimal prompt template for code correction based on the error type. Results are cached for performance. - + Args: error_type (str): Type of error to correct. - + Returns: str: The prompt template text. """ @@ -76,7 +83,9 @@ def get_optimal_correction_template(error_type: str) -> str: return template_registry.get(error_type, TEMPLATE_SYNTAX_CODE_GENERATION) -def syntax_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def syntax_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on syntax error analysis. @@ -87,10 +96,10 @@ def syntax_focused_code_generation(state: Dict[str, Any], analysis: str, llm_mod Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys. - + Example: >>> state = { 'generated_code': 'print("Hello World"' @@ -103,30 +112,33 @@ def syntax_focused_code_generation(state: Dict[str, Any], analysis: str, llm_mod validated_state = CorrectionState( generated_code=state.get("generated_code", "") ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("syntax"), input_variables=["analysis", "generated_code"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code - }) - + return chain.invoke( + {"analysis": analysis, "generated_code": validated_state.generated_code} + ) + except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: raise CodeGenerationError(f"Syntax code generation failed: {str(e)}") -def execution_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def execution_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on execution error analysis. @@ -137,10 +149,10 @@ def execution_focused_code_generation(state: Dict[str, Any], analysis: str, llm_ Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys or analysis is invalid. - + Example: >>> state = { 'generated_code': 'print(x)' @@ -153,30 +165,33 @@ def execution_focused_code_generation(state: Dict[str, Any], analysis: str, llm_ validated_state = CorrectionState( generated_code=state.get("generated_code", "") ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("execution"), input_variables=["analysis", "generated_code"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code - }) - + return chain.invoke( + {"analysis": analysis, "generated_code": validated_state.generated_code} + ) + except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: raise CodeGenerationError(f"Execution code generation failed: {str(e)}") -def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def validation_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on validation error analysis. @@ -187,10 +202,10 @@ def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys or analysis is invalid. - + Example: >>> state = { 'generated_code': 'return {"name": "John"}', @@ -203,33 +218,39 @@ def validation_focused_code_generation(state: Dict[str, Any], analysis: str, llm # Validate state using Pydantic model validated_state = ValidationCorrectionState( generated_code=state.get("generated_code", ""), - json_schema=state.get("json_schema", {}) + json_schema=state.get("json_schema", {}), ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("validation"), input_variables=["analysis", "generated_code", "json_schema"], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code, - "json_schema": validated_state.json_schema, - }) - + return chain.invoke( + { + "analysis": analysis, + "generated_code": validated_state.generated_code, + "json_schema": validated_state.json_schema, + } + ) + except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: raise CodeGenerationError(f"Validation code generation failed: {str(e)}") -def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_model) -> str: +def semantic_focused_code_generation( + state: Dict[str, Any], analysis: str, llm_model +) -> str: """ Generates corrected code based on semantic error analysis. @@ -240,10 +261,10 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m Returns: str: The corrected code. - + Raises: InvalidCorrectionStateError: If state is missing required keys or analysis is invalid. - + Example: >>> state = { 'generated_code': 'def add(a, b): return a + b', @@ -258,12 +279,12 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m validated_state = SemanticCorrectionState( generated_code=state.get("generated_code", ""), execution_result=state.get("execution_result", {}), - reference_answer=state.get("reference_answer", {}) + reference_answer=state.get("reference_answer", {}), ) - + if not analysis or not isinstance(analysis, str): raise InvalidCorrectionStateError("Analysis must be a non-empty string") - + # Create prompt template and chain prompt = PromptTemplate( template=get_optimal_correction_template("semantic"), @@ -275,16 +296,24 @@ def semantic_focused_code_generation(state: Dict[str, Any], analysis: str, llm_m ], ) chain = prompt | llm_model | StrOutputParser() - + # Execute chain with validated state - return chain.invoke({ - "analysis": analysis, - "generated_code": validated_state.generated_code, - "generated_result": json.dumps(validated_state.execution_result, indent=2), - "reference_result": json.dumps(validated_state.reference_answer, indent=2), - }) - + return chain.invoke( + { + "analysis": analysis, + "generated_code": validated_state.generated_code, + "generated_result": json.dumps( + validated_state.execution_result, indent=2 + ), + "reference_result": json.dumps( + validated_state.reference_answer, indent=2 + ), + } + ) + except KeyError as e: - raise InvalidCorrectionStateError(f"Missing required key in state dictionary: {e}") + raise InvalidCorrectionStateError( + f"Missing required key in state dictionary: {e}" + ) except Exception as e: - raise CodeGenerationError(f"Semantic code generation failed: {str(e)}") \ No newline at end of file + raise CodeGenerationError(f"Semantic code generation failed: {str(e)}") diff --git a/scrapegraphai/utils/research_web.py b/scrapegraphai/utils/research_web.py index 195e11ca..d633084d 100644 --- a/scrapegraphai/utils/research_web.py +++ b/scrapegraphai/utils/research_web.py @@ -1,70 +1,83 @@ """ -research_web module for web searching across different search engines with improved +research_web module for web searching across different search engines with improved error handling, validation, and security features. """ -import re import random +import re import time -from typing import List, Dict, Union, Optional from functools import wraps +from typing import Dict, List, Optional, Union import requests from bs4 import BeautifulSoup -from pydantic import BaseModel, Field, validator from langchain_community.tools import DuckDuckGoSearchResults +from pydantic import BaseModel, Field, validator class ResearchWebError(Exception): """Base exception for research web errors.""" + pass class SearchConfigError(ResearchWebError): """Exception raised when search configuration is invalid.""" + pass class SearchRequestError(ResearchWebError): """Exception raised when search request fails.""" + pass class ProxyConfig(BaseModel): """Model for proxy configuration validation.""" + server: str = Field(..., description="Proxy server address including port") - username: Optional[str] = Field(None, description="Username for proxy authentication") - password: Optional[str] = Field(None, description="Password for proxy authentication") + username: Optional[str] = Field( + None, description="Username for proxy authentication" + ) + password: Optional[str] = Field( + None, description="Password for proxy authentication" + ) class SearchConfig(BaseModel): """Model for search configuration validation.""" + query: str = Field(..., description="Search query") search_engine: str = Field("duckduckgo", description="Search engine to use") max_results: int = Field(10, description="Maximum number of results to return") port: Optional[int] = Field(8080, description="Port for SearXNG") timeout: int = Field(10, description="Request timeout in seconds") - proxy: Optional[Union[str, Dict, ProxyConfig]] = Field(None, description="Proxy configuration") + proxy: Optional[Union[str, Dict, ProxyConfig]] = Field( + None, description="Proxy configuration" + ) serper_api_key: Optional[str] = Field(None, description="API key for Serper") region: Optional[str] = Field(None, description="Country/region code") language: str = Field("en", description="Language code") - - @validator('search_engine') + + @validator("search_engine") def validate_search_engine(cls, v): """Validate search engine.""" valid_engines = {"duckduckgo", "bing", "searxng", "serper"} if v.lower() not in valid_engines: - raise ValueError(f"Search engine must be one of: {', '.join(valid_engines)}") + raise ValueError( + f"Search engine must be one of: {', '.join(valid_engines)}" + ) return v.lower() - - @validator('query') + + @validator("query") def validate_query(cls, v): """Validate search query.""" if not v or not isinstance(v, str): raise ValueError("Query must be a non-empty string") return v - - @validator('max_results') + + @validator("max_results") def validate_max_results(cls, v): """Validate max results.""" if v < 1 or v > 100: @@ -73,24 +86,24 @@ def validate_max_results(cls, v): # Define advanced PDF detection regex -PDF_REGEX = re.compile(r'\.pdf(#.*)?(\?.*)?$', re.IGNORECASE) +PDF_REGEX = re.compile(r"\.pdf(#.*)?(\?.*)?$", re.IGNORECASE) # Rate limiting decorator def rate_limited(calls: int, period: int = 60): """ Decorator to limit the rate of function calls. - + Args: calls (int): Maximum number of calls allowed in the period. period (int): Time period in seconds. - + Returns: Callable: Decorated function with rate limiting. """ min_interval = period / float(calls) last_called = [0.0] - + def decorator(func): @wraps(func) def wrapper(*args, **kwargs): @@ -101,22 +114,24 @@ def wrapper(*args, **kwargs): result = func(*args, **kwargs) last_called[0] = time.time() return result + return wrapper + return decorator def sanitize_search_query(query: str) -> str: """ Sanitizes search query to prevent injection attacks. - + Args: query (str): The search query. - + Returns: str: Sanitized query. """ # Remove potential command injection characters - sanitized = re.sub(r'[;&|`$()\[\]{}<>]', '', query) + sanitized = re.sub(r"[;&|`$()\[\]{}<>]", "", query) # Trim whitespace sanitized = sanitized.strip() return sanitized @@ -128,14 +143,14 @@ def sanitize_search_query(query: str) -> str: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", - "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1" + "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1", ] def get_random_user_agent() -> str: """ Returns a random user agent from the list. - + Returns: str: Random user agent string. """ @@ -167,10 +182,10 @@ def search_on_web( serper_api_key (str): API key for Serper region (str): Country/region code (e.g., 'mx' for Mexico) language (str): Language code (e.g., 'es' for Spanish) - + Returns: List[str]: List of URLs from search results - + Raises: SearchConfigError: If search configuration is invalid SearchRequestError: If search request fails @@ -179,7 +194,7 @@ def search_on_web( try: # Sanitize query for security sanitized_query = sanitize_search_query(query) - + # Validate search configuration config = SearchConfig( query=sanitized_query, @@ -190,14 +205,14 @@ def search_on_web( proxy=proxy, serper_api_key=serper_api_key, region=region, - language=language + language=language, ) - + # Format proxy once formatted_proxy = None if config.proxy: formatted_proxy = format_proxy(config.proxy) - + results = [] if config.search_engine == "duckduckgo": # Create a DuckDuckGo search object with max_results @@ -209,26 +224,17 @@ def search_on_web( elif config.search_engine == "bing": results = _search_bing( - config.query, - config.max_results, - config.timeout, - formatted_proxy + config.query, config.max_results, config.timeout, formatted_proxy ) elif config.search_engine == "searxng": results = _search_searxng( - config.query, - config.max_results, - config.port, - config.timeout + config.query, config.max_results, config.port, config.timeout ) elif config.search_engine == "serper": results = _search_serper( - config.query, - config.max_results, - config.serper_api_key, - config.timeout + config.query, config.max_results, config.serper_api_key, config.timeout ) return filter_pdf_links(results) @@ -246,40 +252,35 @@ def _search_bing( ) -> List[str]: """ Helper function for Bing search with improved error handling. - + Args: query (str): Search query max_results (int): Maximum number of results to return timeout (int): Request timeout in seconds proxy (str, optional): Proxy configuration - + Returns: List[str]: List of URLs from search results """ - headers = { - "User-Agent": get_random_user_agent() - } - - params = { - "q": query, - "count": max_results - } - + headers = {"User-Agent": get_random_user_agent()} + + params = {"q": query, "count": max_results} + proxies = {"http": proxy, "https": proxy} if proxy else None - + try: response = requests.get( - "https://www.bing.com/search", - params=params, - headers=headers, - proxies=proxies, - timeout=timeout + "https://www.bing.com/search", + params=params, + headers=headers, + proxies=proxies, + timeout=timeout, ) response.raise_for_status() - + soup = BeautifulSoup(response.text, "html.parser") results = [] - + # Extract URLs from Bing search results for link in soup.select("li.b_algo h2 a"): url = link.get("href") @@ -287,31 +288,27 @@ def _search_bing( results.append(url) if len(results) >= max_results: break - + return results except Exception as e: raise SearchRequestError(f"Bing search failed: {str(e)}") -def _search_searxng( - query: str, max_results: int, port: int, timeout: int -) -> List[str]: +def _search_searxng(query: str, max_results: int, port: int, timeout: int) -> List[str]: """ Helper function for SearXNG search. - + Args: query (str): Search query max_results (int): Maximum number of results to return port (int): Port for SearXNG timeout (int): Request timeout in seconds - + Returns: List[str]: List of URLs from search results """ - headers = { - "User-Agent": get_random_user_agent() - } - + headers = {"User-Agent": get_random_user_agent()} + params = { "q": query, "format": "json", @@ -319,18 +316,18 @@ def _search_searxng( "language": "en", "time_range": "", "engines": "duckduckgo,bing,brave", - "results": max_results + "results": max_results, } - + try: response = requests.get( f"http://localhost:{port}/search", params=params, headers=headers, - timeout=timeout + timeout=timeout, ) response.raise_for_status() - + json_data = response.json() results = [result["url"] for result in json_data.get("results", [])] return results[:max_results] @@ -343,48 +340,42 @@ def _search_serper( ) -> List[str]: """ Helper function for Serper search. - + Args: query (str): Search query max_results (int): Maximum number of results to return api_key (str): API key for Serper timeout (int): Request timeout in seconds - + Returns: List[str]: List of URLs from search results """ if not api_key: raise SearchConfigError("Serper API key is required") - - headers = { - "X-API-KEY": api_key, - "Content-Type": "application/json" - } - - data = { - "q": query, - "num": max_results - } - + + headers = {"X-API-KEY": api_key, "Content-Type": "application/json"} + + data = {"q": query, "num": max_results} + try: response = requests.post( "https://google.serper.dev/search", json=data, headers=headers, - timeout=timeout + timeout=timeout, ) response.raise_for_status() - + json_data = response.json() results = [] - + # Extract organic search results for item in json_data.get("organic", []): if "link" in item: results.append(item["link"]) if len(results) >= max_results: break - + return results except Exception as e: raise SearchRequestError(f"Serper search failed: {str(e)}") @@ -393,65 +384,65 @@ def _search_serper( def format_proxy(proxy_config: Union[str, Dict, ProxyConfig]) -> str: """ Format proxy configuration into a string. - + Args: proxy_config: Proxy configuration as string, dict, or ProxyConfig - + Returns: str: Formatted proxy string """ if isinstance(proxy_config, str): return proxy_config - + if isinstance(proxy_config, dict): proxy_config = ProxyConfig(**proxy_config) - + # Format proxy with authentication if provided if proxy_config.username and proxy_config.password: auth = f"{proxy_config.username}:{proxy_config.password}@" return f"http://{auth}{proxy_config.server}" - + return f"http://{proxy_config.server}" def filter_pdf_links(urls: List[str]) -> List[str]: """ Filter out PDF links from search results. - + Args: urls (List[str]): List of URLs - + Returns: List[str]: Filtered list of URLs without PDFs """ return [url for url in urls if not PDF_REGEX.search(url)] -def verify_request_signature(request_data: Dict, signature: str, secret_key: str) -> bool: +def verify_request_signature( + request_data: Dict, signature: str, secret_key: str +) -> bool: """ Verify the signature of an incoming request. - + Args: request_data (Dict): Request data to verify signature (str): Provided signature secret_key (str): Secret key for verification - + Returns: bool: True if signature is valid, False otherwise """ - import hmac import hashlib + import hmac import json - + # Sort keys for consistent serialization data_string = json.dumps(request_data, sort_keys=True) - + # Create HMAC signature computed_signature = hmac.new( - secret_key.encode(), - data_string.encode(), - hashlib.sha256 + secret_key.encode(), data_string.encode(), hashlib.sha256 ).hexdigest() - + # Compare signatures using constant-time comparison to prevent timing attacks - return hmac.compare_digest(computed_signature, signature) \ No newline at end of file + return hmac.compare_digest(computed_signature, signature)