Skip to content

Commit 5d1b526

Browse files
committed
[improve](function) support any_value function with complex type (apache#49419)
Problem Summary: support any_value function with complex type: array/map/struct bitmap/hll/quantile/agg_state/json
1 parent 8b59672 commit 5d1b526

File tree

5 files changed

+478
-93
lines changed

5 files changed

+478
-93
lines changed

be/src/vec/aggregate_functions/aggregate_function_min_max.cpp

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020

2121
#include "vec/aggregate_functions/aggregate_function_min_max.h"
2222

23+
#include "runtime/define_primitive_type.h"
2324
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
2425
#include "vec/aggregate_functions/factory_helpers.h"
2526
#include "vec/aggregate_functions/helpers.h"
2627
#include "vec/core/types.h"
28+
#include "vec/data_types/data_type.h"
2729
#include "vec/data_types/data_type_nullable.h"
2830

2931
namespace doris::vectorized {
30-
/// min, max, any
32+
/// min, max
3133
template <template <typename> class Data>
3234
AggregateFunctionPtr create_aggregate_function_single_value(const String& name,
3335
const DataTypes& argument_types,
@@ -88,13 +90,40 @@ AggregateFunctionPtr create_aggregate_function_single_value(const String& name,
8890
return nullptr;
8991
}
9092

93+
// any_value
94+
template <template <typename> class Data>
95+
AggregateFunctionPtr create_aggregate_function_single_value_any_value_function(
96+
const String& name, const DataTypes& argument_types, const bool result_is_nullable,
97+
const AggregateFunctionAttr& attr) {
98+
AggregateFunctionPtr res = create_aggregate_function_single_value<Data>(
99+
name, argument_types, result_is_nullable, attr);
100+
if (res) {
101+
return res;
102+
}
103+
const DataTypePtr& argument_type = remove_nullable(argument_types[0]);
104+
if (argument_type->get_primitive_type() == PrimitiveType::TYPE_ARRAY ||
105+
argument_type->get_primitive_type() == PrimitiveType::TYPE_MAP ||
106+
argument_type->get_primitive_type() == PrimitiveType::TYPE_STRUCT ||
107+
argument_type->get_primitive_type() == PrimitiveType::TYPE_AGG_STATE ||
108+
argument_type->get_primitive_type() == PrimitiveType::TYPE_OBJECT ||
109+
argument_type->get_primitive_type() == PrimitiveType::TYPE_HLL ||
110+
argument_type->get_primitive_type() == PrimitiveType::TYPE_QUANTILE_STATE) {
111+
return creator_without_type::create<
112+
AggregateFunctionsSingleValue<SingleValueDataComplexType>>(argument_types,
113+
result_is_nullable);
114+
}
115+
116+
return nullptr;
117+
}
118+
91119
void register_aggregate_function_minmax(AggregateFunctionSimpleFactory& factory) {
92120
factory.register_function_both(
93121
"max", create_aggregate_function_single_value<AggregateFunctionMaxData>);
94122
factory.register_function_both(
95123
"min", create_aggregate_function_single_value<AggregateFunctionMinData>);
96124
factory.register_function_both(
97-
"any", create_aggregate_function_single_value<AggregateFunctionAnyData>);
125+
"any",
126+
create_aggregate_function_single_value_any_value_function<AggregateFunctionAnyData>);
98127
factory.register_alias("any", "any_value");
99128
}
100129

be/src/vec/aggregate_functions/aggregate_function_min_max.h

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
#pragma once
2222

2323
#include <fmt/format.h>
24+
#include <glog/logging.h>
2425
#include <string.h>
2526

2627
#include <memory>
28+
#include <string>
29+
#include <type_traits>
2730
#include <vector>
2831

2932
#include "common/logging.h"
@@ -519,18 +522,104 @@ struct AggregateFunctionMinData : Data {
519522
static const char* name() { return "min"; }
520523
};
521524

525+
// this is used for plain type about any_value function
522526
template <typename Data>
523527
struct AggregateFunctionAnyData : Data {
524528
using Self = AggregateFunctionAnyData;
525529
using Data::IsFixedLength;
530+
static const char* name() { return "any"; }
526531
constexpr static bool IS_ANY = true;
527-
528532
void change_if_better(const IColumn& column, size_t row_num, Arena*) {
529533
this->change_first_time(column, row_num, nullptr);
530534
}
535+
531536
void change_if_better(const Self& to, Arena*) { this->change_first_time(to, nullptr); }
537+
};
532538

539+
// this is used for complex type about any_value function
540+
struct SingleValueDataComplexType {
533541
static const char* name() { return "any"; }
542+
constexpr static bool IS_ANY = true;
543+
constexpr static bool IsFixedLength = false;
544+
using Self = SingleValueDataComplexType;
545+
546+
SingleValueDataComplexType() = default;
547+
548+
SingleValueDataComplexType(const DataTypes& argument_types, int be_version) {
549+
column_type = argument_types[0];
550+
column_data = column_type->create_column();
551+
be_exec_version = be_version;
552+
}
553+
554+
bool has() const { return has_value; }
555+
556+
void change_first_time(const IColumn& column, size_t row_num, Arena*) {
557+
if (UNLIKELY(!has())) {
558+
change_impl(column, row_num);
559+
}
560+
}
561+
562+
void change_first_time(const Self& to, Arena*) {
563+
if (UNLIKELY(!has() && to.has())) {
564+
change_impl(*to.column_data, 0);
565+
}
566+
}
567+
568+
void change_impl(const IColumn& column, size_t row_num) {
569+
DCHECK_EQ(column_data->size(), 0);
570+
column_data->insert_from(column, row_num);
571+
has_value = true;
572+
}
573+
574+
void insert_result_into(IColumn& to) const {
575+
if (has()) {
576+
to.insert_from(*column_data, 0);
577+
} else {
578+
to.insert_default();
579+
}
580+
}
581+
582+
void reset() {
583+
column_data->clear();
584+
has_value = false;
585+
}
586+
587+
void write(BufferWritable& buf) const {
588+
write_binary(has(), buf);
589+
if (!has()) {
590+
return;
591+
}
592+
auto size_bytes =
593+
column_type->get_uncompressed_serialized_bytes(*column_data, be_exec_version);
594+
std::string memory_buffer(size_bytes, '0');
595+
auto* p = column_type->serialize(*column_data, memory_buffer.data(), be_exec_version);
596+
write_binary(memory_buffer, buf);
597+
DCHECK_EQ(p, memory_buffer.data() + size_bytes);
598+
}
599+
600+
void read(BufferReadable& buf, Arena* arena) {
601+
read_binary(has_value, buf);
602+
if (!has()) {
603+
return;
604+
}
605+
std::string memory_buffer;
606+
read_binary(memory_buffer, buf);
607+
const auto* p =
608+
column_type->deserialize(memory_buffer.data(), &column_data, be_exec_version);
609+
DCHECK_EQ(p, memory_buffer.data() + memory_buffer.size());
610+
}
611+
612+
void change_if_better(const IColumn& column, size_t row_num, Arena* arena) {
613+
this->change_first_time(column, row_num, nullptr);
614+
}
615+
616+
void change_if_better(const Self& to, Arena* arena) { this->change_first_time(to, nullptr); }
617+
618+
private:
619+
bool has_value = false;
620+
MutableColumnPtr column_data;
621+
DataTypePtr column_type;
622+
int be_exec_version = -1;
534623
};
535624

536625
template <typename Data>
@@ -539,6 +628,7 @@ class AggregateFunctionsSingleValue final
539628
private:
540629
DataTypePtr& type;
541630
using Base = IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>;
631+
using IAggregateFunction::argument_types;
542632

543633
public:
544634
AggregateFunctionsSingleValue(const DataTypes& arguments)
@@ -555,6 +645,14 @@ class AggregateFunctionsSingleValue final
555645
}
556646
}
557647

648+
void create(AggregateDataPtr __restrict place) const override {
649+
if constexpr (std::is_same_v<Data, SingleValueDataComplexType>) {
650+
new (place) Data(argument_types, IAggregateFunction::version);
651+
} else {
652+
new (place) Data;
653+
}
654+
}
655+
558656
String get_name() const override { return Data::name(); }
559657

560658
DataTypePtr get_return_type() const override { return type; }
@@ -716,4 +814,9 @@ AggregateFunctionPtr create_aggregate_function_single_value(const String& name,
716814
const DataTypes& argument_types,
717815
const bool result_is_nullable,
718816
const AggregateFunctionAttr& attr = {});
817+
818+
template <template <typename> class Data>
819+
AggregateFunctionPtr create_aggregate_function_single_value_any_value_function(
820+
const String& name, const DataTypes& argument_types, const bool result_is_nullable,
821+
const AggregateFunctionAttr& attr = {});
719822
} // namespace doris::vectorized

regression-test/data/nereids_syntax_p0/any_value.out

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,77 @@
55
-- !sql_min --
66
00:00:00
77

8+
-- !sql_any1 --
9+
\N \N \N \N \N \N \N \N
10+
11+
-- !sql_any2 --
12+
13+
-- !sql_any3 --
14+
[1, 2, 3] [4, 5, 6] ["a", "b", "c"] ["d", "e", "f"] {"s_id":1, "s_name":"a", "s_address":"b"} {"s_id":2, "s_name":"c", "s_address":"d"} {"a":1, "b":2} {"c":3, "d":4}
15+
16+
-- !sql_any4 --
17+
[1, 2, 3] [4, 5, 6] ["a", "b", "c"] ["d", "e", "f"] {"s_id":1, "s_name":"a", "s_address":"b"} {"s_id":2, "s_name":"c", "s_address":"d"} {"a":1, "b":2} {"c":3, "d":4}
18+
[4, 5, 6] [7, 8, 9] ["d", "e", "f"] ["g", "h", "i"] {"s_id":3, "s_name":"e", "s_address":"f"} {"s_id":4, "s_name":"g", "s_address":"h"} {"e":5, "f":6} {"g":7, "h":8}
19+
20+
-- !sql_any5 --
21+
\N \N \N \N \N \N \N \N \N \N \N
22+
23+
-- !sql_any6 --
24+
25+
-- !sql_any7 --
26+
1 1 1 1 1.100 a 2021-01-01 2021-01-01T00:00 a 1.1 1.1
27+
28+
-- !sql_any8 --
29+
1 1 1 1 1.100 a 2021-01-01 2021-01-01T00:00 a 1.1 1.1
30+
2 2 2 2 2.200 b 2021-02-02 2021-02-02T00:00 b 2.2 2.2
31+
32+
-- !sql_any9 --
33+
\N
34+
35+
-- !sql_any10 --
36+
37+
-- !sql_any11 --
38+
1,2,3
39+
40+
-- !sql_any12 --
41+
20230101 1,2,3
42+
20230102 4,5,6
43+
44+
-- !sql_any13 --
45+
0
46+
47+
-- !sql_any14 --
48+
49+
-- !sql_any15 --
50+
1
51+
52+
-- !sql_any16 --
53+
1 1
54+
2 1
55+
56+
-- !sql_any17 --
57+
0.0
58+
59+
-- !sql_any18 --
60+
61+
-- !sql_any19 --
62+
1.0
63+
64+
-- !sql_any20 --
65+
20230101 1.0
66+
20230102 200.0
67+
68+
-- !sql_any21 --
69+
\N \N \N
70+
71+
-- !sql_any22 --
72+
73+
-- !sql_any23 --
74+
1 a
75+
76+
-- !sql_any24 --
77+
2 a,a
78+
79+
-- !sql_any25 --
80+
{"DB1":168939,"DNT":"2023-06-10 03:55:33"}
81+

0 commit comments

Comments
 (0)