Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,48 +1,105 @@
package com.bakdata.conquery.models.datasets.concepts.filters.specific;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import javax.annotation.Nullable;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.apiv1.frontend.FrontendFilterConfiguration;
import com.bakdata.conquery.apiv1.frontend.FrontendFilterType;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.common.Range;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.bakdata.conquery.models.exceptions.ConceptConfigurationException;
import com.bakdata.conquery.models.identifiable.ids.specific.ColumnId;
import com.bakdata.conquery.models.query.filter.RangeFilterNode;
import com.bakdata.conquery.models.query.queryplan.aggregators.ColumnAggregator;
import com.bakdata.conquery.models.query.queryplan.aggregators.DistinctValuesWrapperAggregator;
import com.bakdata.conquery.models.query.queryplan.aggregators.specific.DurationSumAggregator;
import com.bakdata.conquery.models.query.queryplan.filter.FilterNode;
import lombok.Getter;
import lombok.Setter;
import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.dropwizard.validation.ValidationMethod;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Getter
@Setter
@Data
@Slf4j
@CPSType(id = "DURATION_SUM", base = Filter.class)
public class DurationSumFilter extends SingleColumnFilter<Range.LongRange> {
public class DurationSumFilter extends Filter<Range.LongRange> {

@Override
@Valid
@NotNull
@JsonAlias("column")
private final ColumnId dateRangeColumn;

@Valid
@Nullable
private final List<ColumnId> distinctBy;

@JsonIgnore
public EnumSet<MajorTypeId> getAcceptedColumnTypes() {
return EnumSet.of(MajorTypeId.DATE_RANGE);
}

@Override
public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException {
MajorTypeId type = getColumn().resolve().getType();
MajorTypeId type = getDateRangeColumn().resolve().getType();
if (type != MajorTypeId.DATE_RANGE) {
throw new ConceptConfigurationException(getConnector(), "DURATION_SUM filter is incompatible with columns of type "
+ type);
+ type
);
}

f.setType(FrontendFilterType.Fields.INTEGER_RANGE);
f.setMin(0);
}

@JsonIgnore
private boolean hasDistinct() {
return distinctBy != null && !distinctBy.isEmpty();
}

@Override
public FilterNode createFilterNode(Range.LongRange value) {
return new RangeFilterNode(value, new DurationSumAggregator(getColumn().resolve()));
ColumnAggregator<?> aggregator = new DurationSumAggregator(getDateRangeColumn().resolve());

if (hasDistinct()) {
aggregator = new DistinctValuesWrapperAggregator<>(aggregator, distinctBy.stream().map(ColumnId::resolve).toList());
}

return new RangeFilterNode(value, aggregator);
}

@Override
public List<ColumnId> getRequiredColumns() {
List<ColumnId> required = new ArrayList<>();

if (hasDistinct()) {
required.addAll(distinctBy);
}

required.add(dateRangeColumn);
return required;

}

@JsonIgnore
@ValidationMethod(message = "Columns do not match required Type.")
public boolean isValidColumnType() {
final Column resolved = getDateRangeColumn().resolve();
final boolean acceptable = getAcceptedColumnTypes().contains(resolved.getType());

if (!acceptable) {
log.error("Column[{}] is of Type[{}]. Not one of [{}]", resolved.getId(), resolved.getType(), getAcceptedColumnTypes());
}

return acceptable;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.models.datasets.concepts.select.connector.specific;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

Expand All @@ -8,41 +9,60 @@
import com.bakdata.conquery.models.datasets.concepts.select.Select;
import com.bakdata.conquery.models.identifiable.ids.specific.ColumnId;
import com.bakdata.conquery.models.query.queryplan.aggregators.Aggregator;
import com.bakdata.conquery.models.query.queryplan.aggregators.DistinctValuesWrapperAggregator;
import com.bakdata.conquery.models.query.queryplan.aggregators.specific.DurationSumAggregator;
import com.bakdata.conquery.models.types.ResultType;
import com.bakdata.conquery.sql.conversion.model.select.DurationSumSelectConverter;
import com.bakdata.conquery.sql.conversion.model.select.SelectConverter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.Data;

@Setter
@Getter
@NoArgsConstructor(onConstructor_ = @JsonCreator)
@Data
@CPSType(id = "DURATION_SUM", base = Select.class)
@JsonIgnoreProperties("categorical")
public class DurationSumSelect extends Select implements DaterangeSelectOrFilter {

@Nullable
private ColumnId column;
@Nullable
private ColumnId startColumn;
private final ColumnId column;

@Nullable
private ColumnId endColumn;
private final ColumnId startColumn, endColumn;

private final List<ColumnId> distinctBy;

@Override
public List<ColumnId> getRequiredColumns() {
List<ColumnId> out = new ArrayList<>();

if (column != null) {
return List.of(column);
out.add(column);
}
else {
out.add(startColumn);
out.add(endColumn);
}

if (hasDistinct()) {
out.addAll(distinctBy);
}
return List.of(startColumn, endColumn);
return out;
}

@JsonIgnore
private boolean hasDistinct() {
return distinctBy != null && !distinctBy.isEmpty();
}

@Override
public Aggregator<?> createAggregator() {
return new DurationSumAggregator(getColumn().resolve());
DurationSumAggregator aggregator = new DurationSumAggregator(getColumn().resolve());

if (!hasDistinct()) {
return aggregator;
}

return new DistinctValuesWrapperAggregator<>(aggregator, distinctBy.stream().map(ColumnId::resolve).toList());
}

@Override
Expand All @@ -52,6 +72,7 @@ public ResultType getResultType() {

@Override
public SelectConverter<DurationSumSelect> createConverter() {
//TODO apply distinctBy (though needs to be done once other branches are merged)
return new DurationSumSelectConverter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
public class DistinctValuesWrapperAggregator<VALUE> extends ColumnAggregator<VALUE> {

private final ColumnAggregator<VALUE> aggregator;

private final Set<List<Object>> observed = new HashSet<>();

@Getter
private final List<Column> columns;


private final Set<List<Object>> observed = new HashSet<>();


@Override
public VALUE createAggregationResult() {
return aggregator.createAggregationResult();
Expand All @@ -45,6 +45,16 @@ public List<Column> getRequiredColumns() {
.build();
}

@Override
public void nextTable(QueryExecutionContext ctx, Table currentTable) {
aggregator.nextTable(ctx, currentTable);
}

@Override
public void nextBlock(Bucket bucket) {
aggregator.nextBlock(bucket);
}

@Override
public void collectRequiredTables(Set<Table> out) {
for (Column column : columns) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"type": "QUERY_TEST",
"label": "DURATION_SUM_AGGREGATOR Test",
"expectedCsv": "tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/expected.csv",
"query": {
"type": "CONCEPT_QUERY",
"root": {
"ids": [
"concept"
],
"type": "CONCEPT",
"tables": [
{
"id": "concept.connector",
"selects": [
"concept.connector.select"
]
}
]
}
},
"concepts": [
{
"name": "concept",
"type": "TREE",
"connectors": [
{
"name": "connector",
"table": "table",
"validityDates": {
"label": "datum",
"column": "table.datum"
},
"selects": {
"type": "DURATION_SUM",
"name": "select",
"column": "table.datum",
"distinctBy" : ["table.val"]
}
}
]
}
],
"content": {
"tables": [
{
"csv": "tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/content.csv",
"name": "table",
"primaryColumn": {
"name": "pid",
"type": "STRING"
},
"columns": [
{
"name": "datum",
"type": "DATE_RANGE"
},
{
"name": "val",
"type": "STRING"
}
]
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pid,datum,val
1,2010-01-01/2010-01-01,a
3,2010-01-01/2010-01-01,
4,2010-01-01/2010-01-01,a
4,2010-01-02/2010-01-02,a
5,2010-01-01/2010-01-01,a
5,2010-01-01/2010-01-02,b
6,/2010-01-01,a
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
result,dates,concept select
1,{2010-01-01/2010-01-01},1
3,{2010-01-01/2010-01-01},
4,{2010-01-01/2010-01-02},1
5,{2010-01-01/2010-01-02},2
6,{-∞/2010-01-01},
Comment on lines +1 to +6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow das sieht irgendwie gefährlich und unintuitiv aus. Das Ergebnis hängt von der Sortierung der Zeilen ab 👀

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naja, es geht am Ende nicht anders. Es geht ja darum, dass bestimmte Rezepte oä nur einmal gezählt werden. Aber ist definitiv wert in der Doku zu erwähnen, guter Punkt.

Loading
Loading