diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java index 5807cbfa40c..c1ada4b2371 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/EventCoordinator.java @@ -64,20 +64,20 @@ public static class Event { private final Ample.DataLevel level; private final KeyExtent extent; - Event(EventScope scope, KeyExtent extent) { - this.scope = scope; + Event(KeyExtent extent) { + this.scope = EventScope.TABLE_RANGE; this.level = Ample.DataLevel.of(extent.tableId()); this.extent = extent; } - Event(EventScope scope, TableId tableId) { - this.scope = scope; + Event(TableId tableId) { + this.scope = EventScope.TABLE; this.level = Ample.DataLevel.of(tableId); this.extent = new KeyExtent(tableId, null, null); } - Event(EventScope scope, Ample.DataLevel level) { - this.scope = scope; + Event(Ample.DataLevel level) { + this.scope = EventScope.DATA_LEVEL; this.level = level; this.extent = null; } @@ -117,26 +117,26 @@ public void event(String msg, Object... args) { @Override public void event(Ample.DataLevel level, String msg, Object... args) { log.info(String.format(msg, args)); - publish(new Event(EventScope.DATA_LEVEL, level)); + publish(new Event(level)); } @Override public void event(TableId tableId, String msg, Object... args) { log.info(String.format(msg, args)); - publish(new Event(EventScope.TABLE, tableId)); + publish(new Event(tableId)); } @Override public void event(KeyExtent extent, String msg, Object... args) { log.debug(String.format(msg, args)); - publish(new Event(EventScope.TABLE_RANGE, extent)); + publish(new Event(extent)); } @Override public void event(Collection extents, String msg, Object... args) { if (!extents.isEmpty()) { log.debug(String.format(msg, args)); - extents.forEach(extent -> publish(new Event(EventScope.TABLE_RANGE, extent))); + extents.forEach(extent -> publish(new Event(extent))); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java new file mode 100644 index 00000000000..c9698191e5d --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/EventQueue.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.util.CountDownTimer; +import org.apache.accumulo.manager.EventCoordinator.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Event queue that collapses events when possible. + */ +public class EventQueue { + + private static final Logger log = LoggerFactory.getLogger(EventQueue.class); + private boolean allLevels = false; + + private static class Table { + final TableId tableId; + boolean allExtents = false; + Map extents = new HashMap<>(); + + private Table(TableId tableId) { + this.tableId = tableId; + } + + public void add(Event event) { + if (allExtents) { + return; + } + + if (event.getScope() == EventCoordinator.EventScope.TABLE) { + allExtents = true; + extents.clear(); + } else { + Preconditions.checkArgument(event.getScope() == EventCoordinator.EventScope.TABLE_RANGE); + extents.put(event.getExtent(), event); + if (extents.size() > 10_000) { + allExtents = true; + extents.clear(); + } + } + } + + public void fill(List events) { + if (allExtents) { + events.add(new Event(tableId)); + } else { + events.addAll(extents.values()); + } + } + } + + private static class Level { + final Ample.DataLevel dataLevel; + boolean allTables = false; + Map tables = new HashMap<>(); + + private Level(Ample.DataLevel dataLevel) { + this.dataLevel = dataLevel; + } + + void add(Event event) { + if (allTables) { + return; + } + + if (event.getScope() == EventCoordinator.EventScope.DATA_LEVEL) { + allTables = true; + tables.clear(); + } else { + var table = tables.computeIfAbsent(event.getTableId(), Table::new); + table.add(event); + } + } + + public void fill(List events) { + if (allTables) { + events.add(new Event(dataLevel)); + } else { + tables.values().forEach(table -> table.fill(events)); + } + } + } + + private HashMap levels = new HashMap<>(); + + public synchronized void add(Event event) { + if (allLevels) { + return; + } + + if (event.getScope() == EventCoordinator.EventScope.ALL) { + allLevels = true; + levels.clear(); + } else { + var level = levels.computeIfAbsent(event.getLevel(), Level::new); + level.add(event); + } + notify(); + } + + private static final List ALL_LEVELS = List.of(new Event()); + + public synchronized List poll(long duration, TimeUnit timeUnit) + throws InterruptedException { + CountDownTimer timer = CountDownTimer.startNew(duration, timeUnit); + while (!allLevels && levels.isEmpty() && !timer.isExpired()) { + wait(Math.max(1, timer.timeLeft(TimeUnit.MILLISECONDS))); + } + + List events; + if (allLevels) { + events = ALL_LEVELS; + } else { + events = new ArrayList<>(); + levels.values().forEach(l -> l.fill(events)); + } + + // reset back to empty + allLevels = false; + levels.clear(); + + return events; + } + + public List take() throws InterruptedException { + return poll(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 01575d6416d..5fd56b3c2cc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -39,8 +40,6 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -77,6 +76,8 @@ import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; +import org.apache.accumulo.manager.EventCoordinator.Event; +import org.apache.accumulo.manager.EventCoordinator.EventScope; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; import org.apache.accumulo.manager.state.TableCounts; @@ -230,26 +231,40 @@ class EventHandler implements EventCoordinator.Listener { // created, so just start off with full scan. private boolean needsFullScan = true; - private final BlockingQueue rangesToProcess; + private final EventQueue eventQueue; - class RangeProccessor implements Runnable { + class RangeProcessor implements Runnable { @Override public void run() { try { while (manager.stillManager()) { - var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS); - if (range == null) { + var events = eventQueue.poll(100, TimeUnit.MILLISECONDS); + + if (events.isEmpty()) { // check to see if still the manager continue; } - ArrayList ranges = new ArrayList<>(); - ranges.add(range); - - rangesToProcess.drainTo(ranges); + EnumSet scopesSeen = EnumSet.noneOf(EventScope.class); + List ranges = new ArrayList<>(events.size()); + for (var event : events) { + scopesSeen.add(event.getScope()); + if (event.getScope() == EventScope.TABLE + || event.getScope() == EventScope.TABLE_RANGE) { + ranges.add(event.getExtent().toMetaRange()); + } + } - if (!processRanges(ranges)) { + if (scopesSeen.contains(EventScope.ALL) || scopesSeen.contains(EventScope.DATA_LEVEL)) { + // Since this code should only receive events for a single data level, and seeing a + // data level should squish all table and tablet events, then seeing ranges indicates + // assumptions this code is making are incorrect or there is a bug somewhere. + Preconditions.checkState(ranges.isEmpty()); setNeedsFullScan(); + } else { + if (!processRanges(ranges)) { + setNeedsFullScan(); + } } } } catch (InterruptedException e) { @@ -259,10 +274,9 @@ public void run() { } EventHandler() { - rangesToProcess = new ArrayBlockingQueue<>(10000); - + eventQueue = new EventQueue(); Threads.createCriticalThread("TGW [" + store.name() + "] event range processor", - new RangeProccessor()).start(); + new RangeProcessor()).start(); } private synchronized void setNeedsFullScan() { @@ -279,24 +293,8 @@ public synchronized boolean isNeedsFullScan() { } @Override - public void process(EventCoordinator.Event event) { - - switch (event.getScope()) { - case ALL: - case DATA_LEVEL: - setNeedsFullScan(); - break; - case TABLE: - case TABLE_RANGE: - if (!rangesToProcess.offer(event.getExtent().toMetaRange())) { - Manager.log.debug("[{}] unable to process event range {} because queue is full", - store.name(), event.getExtent()); - setNeedsFullScan(); - } - break; - default: - throw new IllegalArgumentException("Unhandled scope " + event.getScope()); - } + public void process(Event event) { + eventQueue.add(event); } synchronized void waitForFullScan(long millis) { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/EventQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/EventQueueTest.java new file mode 100644 index 00000000000..e05fb66889b --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/EventQueueTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.manager.EventCoordinator.Event; +import org.apache.accumulo.manager.EventCoordinator.EventScope; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class EventQueueTest { + @Test + public void testAll() throws Exception { + EventQueue eventQueue = new EventQueue(); + + var tableId1 = TableId.of("1"); + var tableId2 = TableId.of("2"); + var tableId3 = TableId.of("3"); + + var extent1a = new KeyExtent(tableId1, new Text("m"), null); + var extent1b = new KeyExtent(tableId1, null, new Text("m")); + var extent2 = new KeyExtent(tableId2, null, null); + + List eventsToAdd1 = new ArrayList<>(); + eventsToAdd1.add(new Event()); + eventsToAdd1.add(new Event(DataLevel.METADATA)); + eventsToAdd1.add(new Event(tableId1)); + eventsToAdd1.add(new Event(tableId2)); + eventsToAdd1.add(new Event(extent1a)); + eventsToAdd1.add(new Event(extent1b)); + eventsToAdd1.add(new Event(extent2)); + + List eventsToAdd2 = + new ArrayList<>(List.of(new Event(DataLevel.METADATA), new Event(tableId2), + new Event(extent1a), new Event(extent1b), new Event(extent2), new Event(tableId3))); + List eventsToAdd3 = new ArrayList<>(List.of(new Event(DataLevel.USER), + new Event(tableId2), new Event(extent1a), new Event(extent1b), new Event(extent2))); + for (int i = 0; i < 10; i++) { + // should see the same result for events added in any order + Collections.shuffle(eventsToAdd1); + eventsToAdd1.forEach(eventQueue::add); + var events = eventQueue.take(); + assertEquals(1, events.size()); + assertEquals(EventScope.ALL, events.get(0).getScope()); + + Collections.shuffle(eventsToAdd2); + eventsToAdd2.forEach(eventQueue::add); + events = eventQueue.take(); + assertEquals(5, events.size()); + assertTrue(events.stream().anyMatch( + e -> e.getScope() == EventScope.DATA_LEVEL && e.getLevel() == DataLevel.METADATA)); + assertTrue(events.stream().anyMatch(e -> e.getScope() == EventScope.TABLE + && e.getLevel() == DataLevel.USER && e.getTableId().equals(tableId2))); + assertTrue(events.stream().anyMatch(e -> e.getScope() == EventScope.TABLE + && e.getLevel() == DataLevel.USER && e.getTableId().equals(tableId3))); + assertTrue(events.stream() + .anyMatch(e -> e.getScope() == EventScope.TABLE_RANGE && e.getLevel() == DataLevel.USER + && e.getTableId().equals(tableId1) && e.getExtent().equals(extent1a))); + assertTrue(events.stream() + .anyMatch(e -> e.getScope() == EventScope.TABLE_RANGE && e.getLevel() == DataLevel.USER + && e.getTableId().equals(tableId1) && e.getExtent().equals(extent1b))); + + Collections.shuffle(eventsToAdd3); + eventsToAdd3.forEach(eventQueue::add); + events = eventQueue.take(); + assertEquals(1, events.size()); + assertTrue(events.stream() + .anyMatch(e -> e.getScope() == EventScope.DATA_LEVEL && e.getLevel() == DataLevel.USER)); + + assertEquals(0, eventQueue.poll(1, TimeUnit.MILLISECONDS).size()); + } + + } +}