Skip to content

Commit 481103c

Browse files
author
yangshijie
committed
[Feat](udtf) Support Python UDTF for Doris
1 parent e38a13b commit 481103c

File tree

60 files changed

+16444
-192
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+16444
-192
lines changed

be/src/udf/python/python_udaf_client.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ Status PythonUDAFClient::close() {
440440

441441
auto writer_res = _writer->Close();
442442
if (!writer_res.ok()) {
443-
return _handle_error(writer_res);
443+
return handle_error(writer_res);
444444
}
445445

446446
_inited = false;
@@ -529,22 +529,22 @@ Status PythonUDAFClient::_send_operation(const arrow::RecordBatch* input,
529529
// Always use the unified schema for all operations
530530
auto begin_res = _writer->Begin(kUnifiedUDAFSchema);
531531
if (!begin_res.ok()) {
532-
return _handle_error(begin_res);
532+
return handle_error(begin_res);
533533
}
534534
_begin = true;
535535
}
536536

537537
// Step 2: Write the record batch to server
538538
auto write_res = _writer->WriteRecordBatch(*input);
539539
if (!write_res.ok()) {
540-
return _handle_error(write_res);
540+
return handle_error(write_res);
541541
}
542542

543543
// Step 3: Read response from server (if output is expected)
544544
if (output != nullptr) {
545545
auto read_res = _reader->Next();
546546
if (!read_res.ok()) {
547-
return _handle_error(read_res.status());
547+
return handle_error(read_res.status());
548548
}
549549

550550
arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
@@ -581,13 +581,13 @@ Status PythonUDAFClient::_send_operation(const arrow::RecordBatch* input,
581581
return Status::OK();
582582
}
583583

584-
Status PythonUDAFClient::_handle_error(arrow::Status status) {
584+
Status PythonUDAFClient::handle_error(arrow::Status status) {
585585
DCHECK(!status.ok());
586586
_writer.reset();
587587
_reader.reset();
588588
_process->shutdown();
589-
590589
std::string msg = status.message();
590+
LOG(ERROR) << "Python UDAF error: " << msg;
591591
// Remove Python traceback noise
592592
size_t pos = msg.find("The above exception was the direct cause");
593593
if (pos != std::string::npos) {

be/src/udf/python/python_udaf_client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,15 @@ class PythonUDAFClient {
147147

148148
Status close();
149149

150+
Status handle_error(arrow::Status status);
151+
150152
std::string print_process() const { return _process->to_string(); }
151153

152154
static std::string print_operation(UDAFOperation op);
153155

154156
private:
155157
DISALLOW_COPY_AND_ASSIGN(PythonUDAFClient);
156158

157-
Status _handle_error(arrow::Status status);
158-
159159
/**
160160
* Helper to execute a UDAF operation (CREATE, RESET, DESTROY, etc.)
161161
* This consolidates the common pattern:

be/src/udf/python/python_udf_client.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ Status PythonUDFClient::handle_error(arrow::Status status) {
9898
_reader.reset();
9999
_process->shutdown();
100100
std::string msg = status.message();
101+
LOG(ERROR) << "Python UDF error: " << msg;
102+
// Remove Python traceback noise
101103
size_t pos = msg.find("The above exception was the direct cause");
102104
if (pos != std::string::npos) {
103105
msg = msg.substr(0, pos);

be/src/udf/python/python_udf_server.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "common/config.h"
3030
#include "udf/python/python_udaf_client.h"
3131
#include "udf/python/python_udf_client.h"
32+
#include "udf/python/python_udtf_client.h"
3233

3334
namespace doris {
3435

@@ -140,7 +141,7 @@ void PythonUDFServerManager::shutdown() {
140141
LOG(INFO) << "Python UDF server manager shutdown successfully";
141142
}
142143

143-
// Explicit template instantiation for UDF and UDAF clients
144+
// Explicit template instantiation for UDF, UDAF and UDTF clients
144145
template Status PythonUDFServerManager::get_client<PythonUDFClient>(
145146
const PythonUDFMeta& func_meta, const PythonVersion& version,
146147
std::shared_ptr<PythonUDFClient>* client);
@@ -149,4 +150,8 @@ template Status PythonUDFServerManager::get_client<PythonUDAFClient>(
149150
const PythonUDFMeta& func_meta, const PythonVersion& version,
150151
std::shared_ptr<PythonUDAFClient>* client);
151152

153+
template Status PythonUDFServerManager::get_client<PythonUDTFClient>(
154+
const PythonUDFMeta& func_meta, const PythonVersion& version,
155+
std::shared_ptr<PythonUDTFClient>* client);
156+
152157
} // namespace doris

0 commit comments

Comments
 (0)