Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
#include "service/backend_options.h"
#include "service/backend_service.h"
#include "service/point_query_executor.h"
#include "udf/python/python_udf_server.h"
#include "udf/python/python_server.h"
#include "util/bfd_parser.h"
#include "util/bit_util.h"
#include "util/brpc_client_cache.h"
Expand Down Expand Up @@ -890,7 +890,7 @@ void ExecEnv::destroy() {
_s_tracking_memory = false;

clear_storage_resource();
PythonUDFServerManager::instance().shutdown();
PythonServerManager::instance().shutdown();
LOG(INFO) << "Doris exec envorinment is destoried.";
}

Expand Down
149 changes: 149 additions & 0 deletions be/src/udf/python/python_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "udf/python/python_client.h"

#include "arrow/flight/client.h"
#include "arrow/flight/server.h"
#include "common/compiler_util.h"
#include "common/status.h"
#include "udf/python/python_udf_meta.h"
#include "udf/python/python_udf_runtime.h"
#include "util/arrow/utils.h"

namespace doris {

Status PythonClient::init(const PythonUDFMeta& func_meta, ProcessPtr process,
const std::string& operation_name) {
if (_inited) {
return Status::InternalError("PythonClient has already been initialized");
}

_operation_name = operation_name;

// Parse and connect to Python server location
arrow::flight::Location location;
RETURN_DORIS_STATUS_IF_RESULT_ERROR(location,
arrow::flight::Location::Parse(process->get_uri()));
RETURN_DORIS_STATUS_IF_RESULT_ERROR(_arrow_client, FlightClient::Connect(location));

// Serialize function metadata to JSON command
std::string command;
RETURN_IF_ERROR(func_meta.serialize_to_json(&command));

// Create Flight descriptor and establish bidirectional streaming
FlightDescriptor descriptor = FlightDescriptor::Command(command);
arrow::flight::FlightClient::DoExchangeResult exchange_res;
RETURN_DORIS_STATUS_IF_RESULT_ERROR(exchange_res, _arrow_client->DoExchange(descriptor));

_reader = std::move(exchange_res.reader);
_writer = std::move(exchange_res.writer);
_process = std::move(process);
_inited = true;

return Status::OK();
}

Status PythonClient::close() {
if (!_inited || !_writer) {
return Status::OK();
}

auto writer_res = _writer->Close();
if (!writer_res.ok()) {
// Don't propagate error from close, just log it
LOG(WARNING) << "Error closing Python client writer: " << writer_res.message();
}

_inited = false;
_begin = false;
_arrow_client.reset();
_writer.reset();
_reader.reset();

// Return process to pool if available
if (auto* pool = _process->pool(); pool) {
pool->return_process(std::move(_process));
}

return Status::OK();
}

Status PythonClient::handle_error(arrow::Status status) {
DCHECK(!status.ok());

// Clean up resources
_writer.reset();
_reader.reset();
_process->shutdown();

// Extract and clean error message
std::string msg = status.message();
LOG(ERROR) << _operation_name << " error: " << msg;

// Remove Python traceback noise for cleaner error messages
size_t pos = msg.find("The above exception was the direct cause");
if (pos != std::string::npos) {
msg = msg.substr(0, pos);
}

return Status::RuntimeError(trim(msg));
}

Status PythonClient::check_process_alive() const {
if (UNLIKELY(!_process->is_alive())) {
return Status::RuntimeError("{} process is not alive", _operation_name);
}
return Status::OK();
}

Status PythonClient::begin_stream(const std::shared_ptr<arrow::Schema>& schema) {
if (UNLIKELY(!_begin)) {
auto begin_res = _writer->Begin(schema);
if (!begin_res.ok()) {
return handle_error(begin_res);
}
_begin = true;
}
return Status::OK();
}

Status PythonClient::write_batch(const arrow::RecordBatch& input) {
auto write_res = _writer->WriteRecordBatch(input);
if (!write_res.ok()) {
return handle_error(write_res);
}
return Status::OK();
}

Status PythonClient::read_batch(std::shared_ptr<arrow::RecordBatch>* output) {
auto read_res = _reader->Next();
if (!read_res.ok()) {
return handle_error(read_res.status());
}

arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
if (!chunk.data) {
_process->shutdown();
return Status::InternalError("Received null RecordBatch from {} server", _operation_name);
}

*output = std::move(chunk.data);
return Status::OK();
}

} // namespace doris
119 changes: 119 additions & 0 deletions be/src/udf/python/python_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <arrow/status.h>

#include "arrow/flight/client.h"
#include "common/status.h"
#include "udf/python/python_udf_meta.h"
#include "udf/python/python_udf_runtime.h"
#include "util/arrow/utils.h"

namespace doris {

/**
* Base class for Python UDF/UDAF/UDTF clients
*
* Provides common functionality for communicating with Python server via Arrow Flight:
* - Connection management
* - Stream initialization
* - Error handling
* - Process lifecycle management
*/
class PythonClient {
public:
using FlightDescriptor = arrow::flight::FlightDescriptor;
using FlightClient = arrow::flight::FlightClient;
using FlightStreamWriter = arrow::flight::FlightStreamWriter;
using FlightStreamReader = arrow::flight::FlightStreamReader;

PythonClient() = default;
virtual ~PythonClient() = default;

/**
* Initialize connection to Python server
* @param func_meta Function metadata
* @param process Python process handle
* @param operation_name Operation name for error messages (e.g., "Python UDF")
* @return Status
*/
Status init(const PythonUDFMeta& func_meta, ProcessPtr process,
const std::string& operation_name);

/**
* Close connection and cleanup resources
* @return Status
*/
Status close();

/**
* Handle Arrow Flight error
* @param status Arrow status
* @return Doris Status with formatted error message
*/
Status handle_error(arrow::Status status);

/**
* Get process information for debugging
* @return Process string representation
*/
std::string print_process() const { return _process->to_string(); }

protected:
/**
* Check if process is alive
* @return Status
*/
Status check_process_alive() const;

/**
* Begin Flight stream with schema (called only once per stream)
* @param schema Input schema
* @return Status
*/
Status begin_stream(const std::shared_ptr<arrow::Schema>& schema);

/**
* Write RecordBatch to server
* @param input Input RecordBatch
* @return Status
*/
Status write_batch(const arrow::RecordBatch& input);

/**
* Read RecordBatch from server
* @param output Output RecordBatch
* @return Status
*/
Status read_batch(std::shared_ptr<arrow::RecordBatch>* output);

// Common state
bool _inited = false;
bool _begin = false; // Track if Begin() has been called
std::string _operation_name; // Operation name for error messages
std::unique_ptr<FlightClient> _arrow_client;
std::unique_ptr<FlightStreamWriter> _writer;
std::unique_ptr<FlightStreamReader> _reader;
ProcessPtr _process;

private:
DISALLOW_COPY_AND_ASSIGN(PythonClient);
};

} // namespace doris
4 changes: 2 additions & 2 deletions be/src/udf/python/python_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <vector>

#include "common/status.h"
#include "udf/python/python_udf_server.h"
#include "udf/python/python_server.h"
#include "util/string_util.h"

namespace doris {
Expand Down Expand Up @@ -283,7 +283,7 @@ Status PythonVersionManager::init(PythonEnvType env_type, const fs::path& python
std::vector<PythonVersion> versions;
RETURN_IF_ERROR(_env_scanner->scan());
RETURN_IF_ERROR(_env_scanner->get_versions(&versions));
RETURN_IF_ERROR(PythonUDFServerManager::instance().init(versions));
RETURN_IF_ERROR(PythonServerManager::instance().init(versions));
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "udf/python/python_udf_server.h"
#include "udf/python/python_server.h"

#include <butil/fd_utility.h>
#include <dirent.h>
Expand All @@ -29,10 +29,11 @@
#include "common/config.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udtf_client.h"

namespace doris {

Status PythonUDFServerManager::init(const std::vector<PythonVersion>& versions) {
Status PythonServerManager::init(const std::vector<PythonVersion>& versions) {
std::lock_guard<std::mutex> lock(_pools_mutex);
for (const auto& version : versions) {
if (_pools.find(version) != _pools.end()) continue;
Expand All @@ -45,9 +46,9 @@ Status PythonUDFServerManager::init(const std::vector<PythonVersion>& versions)
}

template <typename T>
Status PythonUDFServerManager::get_client(const PythonUDFMeta& func_meta,
const PythonVersion& version,
std::shared_ptr<T>* client) {
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta,
const PythonVersion& version,
std::shared_ptr<T>* client) {
PythonUDFProcessPoolPtr* pool = nullptr;
{
std::lock_guard<std::mutex> lock(_pools_mutex);
Expand All @@ -65,12 +66,12 @@ Status PythonUDFServerManager::get_client(const PythonUDFMeta& func_meta,
return Status::OK();
}

Status PythonUDFServerManager::fork(PythonUDFProcessPool* pool, ProcessPtr* process) {
Status PythonServerManager::fork(PythonUDFProcessPool* pool, ProcessPtr* process) {
DCHECK(pool != nullptr);
const PythonVersion& version = pool->get_python_version();
// e.g. /usr/local/python3.7/bin/python3
std::string python_executable_path = version.get_executable_path();
// e.g. /{DORIS_HOME}/plugins/python_udf/python_udf_server.py
// e.g. /{DORIS_HOME}/plugins/python_udf/python_server.py
std::string fight_server_path = get_fight_server_path();
// e.g. grpc+unix:///home/doris/output/be/lib/udf/python/python_udf
std::string base_unix_socket_path = get_base_unix_socket_path();
Expand Down Expand Up @@ -131,7 +132,7 @@ Status PythonUDFServerManager::fork(PythonUDFProcessPool* pool, ProcessPtr* proc
return Status::OK();
}

void PythonUDFServerManager::shutdown() {
void PythonServerManager::shutdown() {
std::lock_guard lock(_pools_mutex);
for (auto& pool : _pools) {
pool.second->shutdown();
Expand All @@ -140,13 +141,17 @@ void PythonUDFServerManager::shutdown() {
LOG(INFO) << "Python UDF server manager shutdown successfully";
}

// Explicit template instantiation for UDF and UDAF clients
template Status PythonUDFServerManager::get_client<PythonUDFClient>(
// Explicit template instantiation for UDF, UDAF and UDTF clients
template Status PythonServerManager::get_client<PythonUDFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDFClient>* client);

template Status PythonUDFServerManager::get_client<PythonUDAFClient>(
template Status PythonServerManager::get_client<PythonUDAFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDAFClient>* client);

template Status PythonServerManager::get_client<PythonUDTFClient>(
const PythonUDFMeta& func_meta, const PythonVersion& version,
std::shared_ptr<PythonUDTFClient>* client);

} // namespace doris
Loading