diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java index f26154c5b8..4e2b0c9350 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/filters/specific/DurationSumFilter.java @@ -1,48 +1,105 @@ package com.bakdata.conquery.models.datasets.concepts.filters.specific; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; +import javax.annotation.Nullable; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; import com.bakdata.conquery.apiv1.frontend.FrontendFilterConfiguration; import com.bakdata.conquery.apiv1.frontend.FrontendFilterType; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.models.common.Range; import com.bakdata.conquery.models.config.ConqueryConfig; +import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.concepts.filters.Filter; -import com.bakdata.conquery.models.datasets.concepts.filters.SingleColumnFilter; import com.bakdata.conquery.models.events.MajorTypeId; import com.bakdata.conquery.models.exceptions.ConceptConfigurationException; +import com.bakdata.conquery.models.identifiable.ids.specific.ColumnId; import com.bakdata.conquery.models.query.filter.RangeFilterNode; +import com.bakdata.conquery.models.query.queryplan.aggregators.ColumnAggregator; +import com.bakdata.conquery.models.query.queryplan.aggregators.DistinctValuesWrapperAggregator; import com.bakdata.conquery.models.query.queryplan.aggregators.specific.DurationSumAggregator; import com.bakdata.conquery.models.query.queryplan.filter.FilterNode; -import lombok.Getter; -import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.dropwizard.validation.ValidationMethod; +import lombok.Data; import lombok.extern.slf4j.Slf4j; -@Getter -@Setter +@Data @Slf4j @CPSType(id = "DURATION_SUM", base = Filter.class) -public class DurationSumFilter extends SingleColumnFilter { +public class DurationSumFilter extends Filter { - @Override + @Valid + @NotNull + @JsonAlias("column") + private final ColumnId dateRangeColumn; + + @Valid + @Nullable + private final List distinctBy; + + @JsonIgnore public EnumSet getAcceptedColumnTypes() { return EnumSet.of(MajorTypeId.DATE_RANGE); } @Override public void configureFrontend(FrontendFilterConfiguration.Top f, ConqueryConfig conqueryConfig) throws ConceptConfigurationException { - MajorTypeId type = getColumn().resolve().getType(); + MajorTypeId type = getDateRangeColumn().resolve().getType(); if (type != MajorTypeId.DATE_RANGE) { throw new ConceptConfigurationException(getConnector(), "DURATION_SUM filter is incompatible with columns of type " - + type); + + type + ); } f.setType(FrontendFilterType.Fields.INTEGER_RANGE); f.setMin(0); } + @JsonIgnore + private boolean hasDistinct() { + return distinctBy != null && !distinctBy.isEmpty(); + } + @Override public FilterNode createFilterNode(Range.LongRange value) { - return new RangeFilterNode(value, new DurationSumAggregator(getColumn().resolve())); + ColumnAggregator aggregator = new DurationSumAggregator(getDateRangeColumn().resolve()); + + if (hasDistinct()) { + aggregator = new DistinctValuesWrapperAggregator<>(aggregator, distinctBy.stream().map(ColumnId::resolve).toList()); + } + + return new RangeFilterNode(value, aggregator); } + + @Override + public List getRequiredColumns() { + List required = new ArrayList<>(); + + if (hasDistinct()) { + required.addAll(distinctBy); + } + + required.add(dateRangeColumn); + return required; + + } + + @JsonIgnore + @ValidationMethod(message = "Columns do not match required Type.") + public boolean isValidColumnType() { + final Column resolved = getDateRangeColumn().resolve(); + final boolean acceptable = getAcceptedColumnTypes().contains(resolved.getType()); + + if (!acceptable) { + log.error("Column[{}] is of Type[{}]. Not one of [{}]", resolved.getId(), resolved.getType(), getAcceptedColumnTypes()); + } + + return acceptable; + } + } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/connector/specific/DurationSumSelect.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/connector/specific/DurationSumSelect.java index d0f35c2325..f0397b6888 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/connector/specific/DurationSumSelect.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/connector/specific/DurationSumSelect.java @@ -1,5 +1,6 @@ package com.bakdata.conquery.models.datasets.concepts.select.connector.specific; +import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; @@ -8,41 +9,60 @@ import com.bakdata.conquery.models.datasets.concepts.select.Select; import com.bakdata.conquery.models.identifiable.ids.specific.ColumnId; import com.bakdata.conquery.models.query.queryplan.aggregators.Aggregator; +import com.bakdata.conquery.models.query.queryplan.aggregators.DistinctValuesWrapperAggregator; import com.bakdata.conquery.models.query.queryplan.aggregators.specific.DurationSumAggregator; import com.bakdata.conquery.models.types.ResultType; import com.bakdata.conquery.sql.conversion.model.select.DurationSumSelectConverter; import com.bakdata.conquery.sql.conversion.model.select.SelectConverter; -import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; +import lombok.Data; -@Setter -@Getter -@NoArgsConstructor(onConstructor_ = @JsonCreator) +@Data @CPSType(id = "DURATION_SUM", base = Select.class) @JsonIgnoreProperties("categorical") public class DurationSumSelect extends Select implements DaterangeSelectOrFilter { @Nullable - private ColumnId column; - @Nullable - private ColumnId startColumn; + private final ColumnId column; + @Nullable - private ColumnId endColumn; + private final ColumnId startColumn, endColumn; + + private final List distinctBy; @Override public List getRequiredColumns() { + List out = new ArrayList<>(); + if (column != null) { - return List.of(column); + out.add(column); + } + else { + out.add(startColumn); + out.add(endColumn); + } + + if (hasDistinct()) { + out.addAll(distinctBy); } - return List.of(startColumn, endColumn); + return out; + } + + @JsonIgnore + private boolean hasDistinct() { + return distinctBy != null && !distinctBy.isEmpty(); } @Override public Aggregator createAggregator() { - return new DurationSumAggregator(getColumn().resolve()); + DurationSumAggregator aggregator = new DurationSumAggregator(getColumn().resolve()); + + if (!hasDistinct()) { + return aggregator; + } + + return new DistinctValuesWrapperAggregator<>(aggregator, distinctBy.stream().map(ColumnId::resolve).toList()); } @Override @@ -52,6 +72,7 @@ public ResultType getResultType() { @Override public SelectConverter createConverter() { + //TODO apply distinctBy (though needs to be done once other branches are merged) return new DurationSumSelectConverter(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/DistinctValuesWrapperAggregator.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/DistinctValuesWrapperAggregator.java index 39c6610208..9b69175577 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/DistinctValuesWrapperAggregator.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/aggregators/DistinctValuesWrapperAggregator.java @@ -25,13 +25,13 @@ public class DistinctValuesWrapperAggregator extends ColumnAggregator { private final ColumnAggregator aggregator; - - private final Set> observed = new HashSet<>(); - @Getter private final List columns; + private final Set> observed = new HashSet<>(); + + @Override public VALUE createAggregationResult() { return aggregator.createAggregationResult(); @@ -45,6 +45,16 @@ public List getRequiredColumns() { .build(); } + @Override + public void nextTable(QueryExecutionContext ctx, Table currentTable) { + aggregator.nextTable(ctx, currentTable); + } + + @Override + public void nextBlock(Bucket bucket) { + aggregator.nextBlock(bucket); + } + @Override public void collectRequiredTables(Set out) { for (Column column : columns) { diff --git a/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/DURATION_SUM.test.json b/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/DURATION_SUM.test.json new file mode 100644 index 0000000000..9cfb2d6fd7 --- /dev/null +++ b/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/DURATION_SUM.test.json @@ -0,0 +1,66 @@ +{ + "type": "QUERY_TEST", + "label": "DURATION_SUM_AGGREGATOR Test", + "expectedCsv": "tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/expected.csv", + "query": { + "type": "CONCEPT_QUERY", + "root": { + "ids": [ + "concept" + ], + "type": "CONCEPT", + "tables": [ + { + "id": "concept.connector", + "selects": [ + "concept.connector.select" + ] + } + ] + } + }, + "concepts": [ + { + "name": "concept", + "type": "TREE", + "connectors": [ + { + "name": "connector", + "table": "table", + "validityDates": { + "label": "datum", + "column": "table.datum" + }, + "selects": { + "type": "DURATION_SUM", + "name": "select", + "column": "table.datum", + "distinctBy" : ["table.val"] + } + } + ] + } + ], + "content": { + "tables": [ + { + "csv": "tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/content.csv", + "name": "table", + "primaryColumn": { + "name": "pid", + "type": "STRING" + }, + "columns": [ + { + "name": "datum", + "type": "DATE_RANGE" + }, + { + "name": "val", + "type": "STRING" + } + ] + } + ] + } +} diff --git a/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/content.csv b/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/content.csv new file mode 100644 index 0000000000..d96fa45b05 --- /dev/null +++ b/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/content.csv @@ -0,0 +1,8 @@ +pid,datum,val +1,2010-01-01/2010-01-01,a +3,2010-01-01/2010-01-01, +4,2010-01-01/2010-01-01,a +4,2010-01-02/2010-01-02,a +5,2010-01-01/2010-01-01,a +5,2010-01-01/2010-01-02,b +6,/2010-01-01,a \ No newline at end of file diff --git a/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/expected.csv b/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/expected.csv new file mode 100644 index 0000000000..40b3416748 --- /dev/null +++ b/backend/src/test/resources/tests/aggregator/DURATION_SUM_DISTINCT_AGGREGATOR/expected.csv @@ -0,0 +1,6 @@ +result,dates,concept select +1,{2010-01-01/2010-01-01},1 +3,{2010-01-01/2010-01-01}, +4,{2010-01-01/2010-01-02},1 +5,{2010-01-01/2010-01-02},2 +6,{-∞/2010-01-01}, \ No newline at end of file diff --git a/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/DURATION_SUM.test.json b/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/DURATION_SUM.test.json new file mode 100644 index 0000000000..b19e8b3a86 --- /dev/null +++ b/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/DURATION_SUM.test.json @@ -0,0 +1,75 @@ +{ + "type": "QUERY_TEST", + "label": "DURATION_SUM_AGGREGATOR Test", + "expectedCsv": "tests/filter/DURATION_SUM_DISTINCT_FILTER/expected.csv", + "query": { + "type": "CONCEPT_QUERY", + "root": { + "ids": [ + "concept" + ], + "type": "CONCEPT", + "tables": [ + { + "id": "concept.connector", + "filters": [ + { + "type": "INTEGER_RANGE", + "filter": "concept.connector.filter", + "value": { + "min": 1, + "max": 1 + } + } + ] + } + ] + } + }, + "concepts": [ + { + "name": "concept", + "type": "TREE", + "connectors": [ + { + "name": "connector", + "table": "table", + "validityDates": { + "label": "datum", + "column": "table.datum" + }, + "filters": { + "type": "DURATION_SUM", + "name": "filter", + "column": "table.datum", + "distinctBy": [ + "table.val" + ] + } + } + ] + } + ], + "content": { + "tables": [ + { + "csv": "tests/filter/DURATION_SUM_DISTINCT_FILTER/content.csv", + "name": "table", + "primaryColumn": { + "name": "pid", + "type": "STRING" + }, + "columns": [ + { + "name": "datum", + "type": "DATE_RANGE" + }, + { + "name": "val", + "type": "STRING" + } + ] + } + ] + } +} diff --git a/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/content.csv b/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/content.csv new file mode 100644 index 0000000000..d96fa45b05 --- /dev/null +++ b/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/content.csv @@ -0,0 +1,8 @@ +pid,datum,val +1,2010-01-01/2010-01-01,a +3,2010-01-01/2010-01-01, +4,2010-01-01/2010-01-01,a +4,2010-01-02/2010-01-02,a +5,2010-01-01/2010-01-01,a +5,2010-01-01/2010-01-02,b +6,/2010-01-01,a \ No newline at end of file diff --git a/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/expected.csv b/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/expected.csv new file mode 100644 index 0000000000..66b9de37e2 --- /dev/null +++ b/backend/src/test/resources/tests/filter/DURATION_SUM_DISTINCT_FILTER/expected.csv @@ -0,0 +1,3 @@ +result,dates +1,{2010-01-01/2010-01-01} +4,{2010-01-01/2010-01-02}