diff --git a/exec/java-exec/src/main/java/org/apache/drill/common/DrillNode.java b/exec/java-exec/src/main/java/org/apache/drill/common/DrillNode.java index 8af70a3132c..26ba3edc830 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/common/DrillNode.java +++ b/exec/java-exec/src/main/java/org/apache/drill/common/DrillNode.java @@ -35,17 +35,38 @@ public static DrillNode create(DrillbitEndpoint endpoint) { return new DrillNode(endpoint); } - public boolean equals(Object other) { - if (!(other instanceof DrillNode)) { - return false; + public boolean equals(Object obj) { + if (obj == this) { + return true; } + if (!(obj instanceof org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint)) { + return super.equals(obj); + } + org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint other = (org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint) obj; - DrillbitEndpoint otherEndpoint = ((DrillNode) other).endpoint; - return endpoint.getAddress().equals(otherEndpoint.getAddress()) && - endpoint.getUserPort() == otherEndpoint.getUserPort() && - endpoint.getControlPort() == otherEndpoint.getControlPort() && - endpoint.getDataPort() == otherEndpoint.getDataPort() && - endpoint.getVersion().equals(otherEndpoint.getVersion()); + boolean result = true; + result = result && (endpoint.hasAddress() == other.hasAddress()); + if (endpoint.hasAddress()) { + result = result && endpoint.getAddress() + .equals(other.getAddress()); + } + result = result && (endpoint.hasUserPort() == other.hasUserPort()); + if (endpoint.hasUserPort()) { + result = result && (endpoint.getUserPort() == other.getUserPort()); + } + result = result && (endpoint.hasControlPort() == other.hasControlPort()); + if (endpoint.hasControlPort()) { + result = result && (endpoint.getControlPort() == other.getControlPort()); + } + result = result && (endpoint.hasDataPort() == other.hasDataPort()); + if (endpoint.hasDataPort()) { + result = result && (endpoint.getDataPort() == other.getDataPort()); + } + result = result && (endpoint.hasVersion() == other.hasVersion()); + if (endpoint.hasVersion()) { + result = result && endpoint.getVersion().equals(other.getVersion()); + } + return result; } @Override @@ -81,8 +102,8 @@ public String toString() { StringBuilder sb = new StringBuilder(); return sb.append("endpoint address :") - .append(endpoint.getAddress()) + .append(endpoint.hasAddress() ? endpoint.getAddress() : "no-address") .append("endpoint user port: ") - .append(endpoint.getUserPort()).toString(); + .append(endpoint.hasUserPort() ? endpoint.getUserPort() : "no-userport").toString(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index f682104aa1a..8061eab67fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) { } @VisibleForTesting - public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) { + public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long optimalMemAllocation) { super(); this.allocator = allocator; this.operatorId = operatorId; @@ -97,7 +97,7 @@ public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAll this.recordsReceivedByInput = new long[inputCount]; this.batchesReceivedByInput = new long[inputCount]; this.schemaCountByInput = new long[inputCount]; - this.optimalMemoryAllocation = initialAllocation; + this.optimalMemoryAllocation = optimalMemAllocation; } private String assertionError(String msg){ @@ -208,7 +208,7 @@ public OperatorProfile getProfile() { .setOperatorId(operatorId) // .setSetupNanos(setupNanos) // .setProcessNanos(processingNanos) - .setOptimalMemAllocation(optimalMemoryAllocation) + .setMaxAllocation(optimalMemoryAllocation) .setWaitNanos(waitNanos); if (allocator != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java index 541130b1670..3c12b187496 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java @@ -47,7 +47,7 @@ * fragment is based on the cluster state and provided queue configuration. */ public class DistributedQueueParallelizer extends SimpleParallelizer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class); private final boolean planHasMemory; private final QueryContext queryContext; private final QueryResourceManager rm; @@ -62,23 +62,21 @@ public DistributedQueueParallelizer(boolean memoryPlanning, QueryContext queryCo } // return the memory computed for a physical operator on a drillbitendpoint. + // At this stage buffered operator memory could have been reduced depending upon + // the selected queue limits. public BiFunction getMemory() { return (endpoint, operator) -> { + long operatorsMemory = operator.getMaxAllocation(); if (!planHasMemory) { - final DrillNode drillEndpointNode = DrillNode.create(endpoint); if (operator.isBufferedOperator(queryContext)) { - Long operatorsMemory = operators.get(drillEndpointNode).get(operator); - logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory); - return operatorsMemory; + final DrillNode drillEndpointNode = DrillNode.create(endpoint); + operatorsMemory = operators.get(drillEndpointNode).get(operator); } else { - Long nonBufferedMemory = (long)operator.getCost().getMemoryCost(); - logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, nonBufferedMemory); - return nonBufferedMemory; + operatorsMemory = (long)operator.getCost().getMemoryCost(); } } - else { - return operator.getMaxAllocation(); - } + logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory); + return operatorsMemory; }; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index 3d28067d563..cb4f8d15993 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -18,9 +18,9 @@ package org.apache.drill.exec.planner.fragment; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.util.memory.MemoryAllocationUtilities; import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -167,27 +167,11 @@ public boolean equals(Object obj) { public List getBufferedOperators(QueryContext queryContext) { List bufferedOps = new ArrayList<>(); - root.accept(new BufferedOpFinder(queryContext), bufferedOps); + root.accept(new MemoryAllocationUtilities.BufferedOpFinder(queryContext), bufferedOps); return bufferedOps; } - protected static class BufferedOpFinder extends AbstractPhysicalVisitor, RuntimeException> { - private final QueryContext context; - public BufferedOpFinder(QueryContext queryContext) { - this.context = queryContext; - } - - @Override - public Void visitOp(PhysicalOperator op, List value) - throws RuntimeException { - if (op.isBufferedOperator(context)) { - value.add(op); - } - visitChildren(op, value); - return null; - } - } @Override public String toString() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java index 4593c55f6ad..508036a129a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java @@ -49,13 +49,13 @@ public class MemoryCalculator extends AbstractOpWrapperVisitor>> bufferedOperators; private final QueryContext queryContext; - private final long MINIMUM_MEMORY_FOR_BUFFER_OPERS; + private final long minimum_memory_for_buffer_opers; public MemoryCalculator(PlanningSet planningSet, QueryContext context, long minMemory) { this.planningSet = planningSet; this.bufferedOperators = new HashMap<>(); this.queryContext = context; - this.MINIMUM_MEMORY_FOR_BUFFER_OPERS = minMemory; + this.minimum_memory_for_buffer_opers = minMemory; } // Helper method to compute the minor fragment count per drillbit. This method returns @@ -138,7 +138,7 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) { // The memory estimates of the optimizer are for the whole operator spread across all the // minor fragments. Divide this memory estimation by fragment width to get the memory // requirement per minor fragment. - long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), MINIMUM_MEMORY_FOR_BUFFER_OPERS); + long memoryCostPerMinorFrag = Math.max((long)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()), minimum_memory_for_buffer_opers); Map drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment); Map roots, public class Collector extends AbstractOpWrapperVisitor { - private final Multimap bufferedOperators; + private final Map> bufferedOperators; public Collector() { - this.bufferedOperators = ArrayListMultimap.create(); + this.bufferedOperators = new HashMap<>(); } private void getMinorFragCountPerDrillbit(Wrapper currFragment, PhysicalOperator operator) { for (DrillbitEndpoint endpoint : currFragment.getAssignedEndpoints()) { - bufferedOperators.put(endpoint, operator); + DrillNode node = new DrillNode(endpoint); + bufferedOperators.putIfAbsent(node, new ArrayList<>()); + bufferedOperators.get(node).add(operator); } } @@ -103,10 +107,10 @@ public Void visitOp(PhysicalOperator op, Wrapper fragment) { } public Map> getNodeMap() { - Map> endpointCollectionMap = bufferedOperators.asMap(); + Map> endpointCollectionMap = bufferedOperators; Map> nodeMap = new HashMap<>(); - for (Map.Entry> entry : endpointCollectionMap.entrySet()) { - nodeMap.put(entry.getKey().getAddress(), entry.getValue()); + for (Map.Entry> entry : endpointCollectionMap.entrySet()) { + nodeMap.put(entry.getKey().toString(), entry.getValue()); } return nodeMap; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java index 5b92688f8ea..c3139162936 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java @@ -44,18 +44,18 @@ public class NodeResources { private long memoryInBytes; - private long numVirtualCpu; + private int numVirtualCpu; private static final int CURRENT_VERSION = 1; - public NodeResources(long memoryInBytes, long numVirtualCpu) { + public NodeResources(long memoryInBytes, int numVirtualCpu) { this(CURRENT_VERSION, memoryInBytes, numVirtualCpu); } @JsonCreator public NodeResources(@JsonProperty("version") int version, @JsonProperty("memoryInBytes") long memoryInBytes, - @JsonProperty("numVirtualCpu") long numVirtualCpu) { + @JsonProperty("numVirtualCpu") int numVirtualCpu) { this.version = version; this.memoryInBytes = memoryInBytes; this.numVirtualCpu = numVirtualCpu; @@ -79,7 +79,7 @@ public long getMemoryInGB() { return Math.round(getMemoryInMB() / 1024L); } - public long getNumVirtualCpu() { + public int getNumVirtualCpu() { return numVirtualCpu; } @@ -140,11 +140,11 @@ public static NodeResources create() { return create(0,0); } - public static NodeResources create(long cpu) { + public static NodeResources create(int cpu) { return create(cpu,0); } - public static NodeResources create(long cpu, long memory) { + public static NodeResources create(int cpu, long memory) { return new NodeResources(CURRENT_VERSION, memory, cpu); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/DefaultMemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/DefaultMemoryAllocationUtilities.java index 648276ef201..a7861cd2b35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/DefaultMemoryAllocationUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/DefaultMemoryAllocationUtilities.java @@ -17,40 +17,18 @@ */ package org.apache.drill.exec.util.memory; -import java.util.ArrayList; import java.util.List; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.memory.RootAllocatorFactory; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; - -public class DefaultMemoryAllocationUtilities { +public class DefaultMemoryAllocationUtilities extends MemoryAllocationUtilities { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultMemoryAllocationUtilities.class); - public static void setupBufferedMemoryAllocations(PhysicalPlan plan, final QueryContext queryContext) { - setupBufferedOpsMemoryAllocations(plan.getProperties().hasResourcePlan, - getBufferedOperators(plan.getSortedOperators(), queryContext), queryContext); - } - - public static List getBufferedOperators(List operators, QueryContext queryContext) { - final List bufferedOpList = new ArrayList<>(); - for (final PhysicalOperator op : operators) { - if (op.isBufferedOperator(queryContext)) { - bufferedOpList.add(op); - } - } - return bufferedOpList; - } - /** * Helper method to setup Memory Allocations *

@@ -105,65 +83,5 @@ public static void setupBufferedOpsMemoryAllocations(boolean planHasMemory, } } - /** - * Compute per-operator memory based on the computed per-node memory, the - * number of operators, and the computed number of fragments (which house - * the operators.) Enforces a floor on the amount of memory per operator. - * - * @param optionManager system option manager - * @param maxAllocPerNode computed query memory per node - * @param opCount number of buffering operators in this query - * @return the per-operator memory - */ - - public static long computeOperatorMemory(OptionSet optionManager, long maxAllocPerNode, int opCount) { - final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE); - final double cpuLoadAverage = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE); - final long maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpuLoadAverage, maxWidth); - final long maxOperatorAlloc = maxAllocPerNode / (opCount * maxWidthPerNode); - logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc); - - // User configurable option to allow forcing minimum memory. - // Ensure that the buffered ops receive the minimum memory needed to make progress. - // Without this, the math might work out to allocate too little memory. - return Math.max(maxOperatorAlloc, - optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP)); - } - - /** - * Per-node memory calculations based on a number of constraints. - *

- * Factored out into a separate method to allow unit testing. - * @param config Drill config - * @param optionManager system options - * @param directMemory amount of direct memory - * @return memory per query per node - */ - - @VisibleForTesting - public static long computeQueryMemory(DrillConfig config, OptionSet optionManager, long directMemory) { - - // Memory computed as a percent of total memory. - - long perQueryMemory = Math.round(directMemory * - optionManager.getOption(ExecConstants.PERCENT_MEMORY_PER_QUERY)); - - // But, must allow at least the amount given explicitly for - // backward compatibility. - - perQueryMemory = Math.max(perQueryMemory, - optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE)); - - // Compute again as either the total direct memory, or the - // configured maximum top-level allocation (10 GB). - - long maxAllocPerNode = Math.min(directMemory, - config.getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC)); - - // Final amount per node per query is the minimum of these two. - - maxAllocPerNode = Math.min(maxAllocPerNode, perQueryMemory); - return maxAllocPerNode; - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/MemoryAllocationUtilities.java new file mode 100644 index 00000000000..c74e3be56a5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/MemoryAllocationUtilities.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util.memory; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.RootAllocatorFactory; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.server.options.OptionSet; +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; + +import java.util.List; + +public class MemoryAllocationUtilities { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class); + /** + * Per-node memory calculations based on a number of constraints. + *

+ * Factored out into a separate method to allow unit testing. + * @param config Drill config + * @param optionManager system options + * @param directMemory amount of direct memory + * @return memory per query per node + */ + + @VisibleForTesting + public static long computeQueryMemory(DrillConfig config, OptionSet optionManager, long directMemory) { + + // Memory computed as a percent of total memory. + + long perQueryMemory = Math.round(directMemory * + optionManager.getOption(ExecConstants.PERCENT_MEMORY_PER_QUERY)); + + // But, must allow at least the amount given explicitly for + // backward compatibility. + + perQueryMemory = Math.max(perQueryMemory, + optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE)); + + // Compute again as either the total direct memory, or the + // configured maximum top-level allocation (10 GB). + + long maxAllocPerNode = Math.min(directMemory, + config.getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC)); + + // Final amount per node per query is the minimum of these two. + + maxAllocPerNode = Math.min(maxAllocPerNode, perQueryMemory); + return maxAllocPerNode; + } + + public static class BufferedOpFinder extends AbstractPhysicalVisitor, RuntimeException> { + private final QueryContext context; + + public BufferedOpFinder(QueryContext queryContext) { + this.context = queryContext; + } + + @Override + public Void visitOp(PhysicalOperator op, List value) + throws RuntimeException { + if (op.isBufferedOperator(context)) { + value.add(op); + } + visitChildren(op, value); + return null; + } + } + + + /** + * Compute per-operator memory based on the computed per-node memory, the + * number of operators, and the computed number of fragments (which house + * the operators.) Enforces a floor on the amount of memory per operator. + * + * @param optionManager system option manager + * @param maxAllocPerNode computed query memory per node + * @param opCount number of buffering operators in this query + * @return the per-operator memory + */ + + public static long computeOperatorMemory(OptionSet optionManager, long maxAllocPerNode, int opCount) { + final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE); + final double cpuLoadAverage = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE); + final long maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpuLoadAverage, maxWidth); + final long maxOperatorAlloc = maxAllocPerNode / (opCount * maxWidthPerNode); + logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc); + + // User configurable option to allow forcing minimum memory. + // Ensure that the buffered ops receive the minimum memory needed to make progress. + // Without this, the math might work out to allocate too little memory. + + return Math.max(maxOperatorAlloc, + optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/ZKQueueMemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/ZKQueueMemoryAllocationUtilities.java index 6d232596bdf..f38d19e9074 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/ZKQueueMemoryAllocationUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/ZKQueueMemoryAllocationUtilities.java @@ -17,27 +17,14 @@ */ package org.apache.drill.exec.util.memory; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.memory.RootAllocatorFactory; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; -import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; -import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; -import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap; -import org.apache.drill.shaded.guava.com.google.common.collect.Multimap; - -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Map; -public class ZKQueueMemoryAllocationUtilities { +public class ZKQueueMemoryAllocationUtilities extends MemoryAllocationUtilities { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKQueueMemoryAllocationUtilities.class); public static void planMemory(QueryContext queryContext, QueryResourceManager rm, Map> nodeMap) { @@ -59,59 +46,6 @@ public static void planMemory(QueryContext queryContext, QueryResourceManager rm } } - public static void getBufferedOps(Multimap map, - QueryWorkUnit.MinorFragmentDefn defn) { - List bufferedOps = getBufferedOps(defn.root()); - if (!bufferedOps.isEmpty()) { - map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps); - } - } - - public static class BufferedOpFinder extends AbstractPhysicalVisitor, RuntimeException> { - @Override - public Void visitOp(PhysicalOperator op, List value) - throws RuntimeException { - if (op.isBufferedOperator(null)) { - value.add(op); - } - visitChildren(op, value); - return null; - } - } - - /** - * Search an individual fragment tree to find any buffered operators it may - * contain. - * - * @param root - * @return - */ - - private static List getBufferedOps(FragmentRoot root) { - List bufferedOps = new ArrayList<>(); - BufferedOpFinder finder = new BufferedOpFinder(); - root.accept(finder, bufferedOps); - return bufferedOps; - } - - /** - * Build a list of external sorts grouped by node. We start with a list of - * minor fragments, each with an endpoint (node). Multiple minor fragments - * may appear on each node, and each minor fragment may have 0, 1 or more - * sorts. - * - * @return - */ - - private static Map> buildBufferedOpMap(QueryWorkUnit work) { - Multimap map = ArrayListMultimap.create(); - getBufferedOps(map, work.getRootFragmentDefn()); - for (QueryWorkUnit.MinorFragmentDefn defn : work.getMinorFragmentDefns()) { - getBufferedOps(map, defn); - } - return map.asMap(); - } - private static int countBufferingOperators(Map> nodeMap) { int width = 0; for (Collection fragSorts : nodeMap.values()) { @@ -191,66 +125,4 @@ else if (alloc < preferredOpMemory) { op.setMaxAllocation(alloc); } } - - /** - * Compute per-operator memory based on the computed per-node memory, the - * number of operators, and the computed number of fragments (which house - * the operators.) Enforces a floor on the amount of memory per operator. - * - * @param optionManager system option manager - * @param maxAllocPerNode computed query memory per node - * @param opCount number of buffering operators in this query - * @return the per-operator memory - */ - - public static long computeOperatorMemory(OptionSet optionManager, long maxAllocPerNode, int opCount) { - final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE); - final double cpuLoadAverage = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE); - final long maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpuLoadAverage, maxWidth); - final long maxOperatorAlloc = maxAllocPerNode / (opCount * maxWidthPerNode); - logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc); - - // User configurable option to allow forcing minimum memory. - // Ensure that the buffered ops receive the minimum memory needed to make progress. - // Without this, the math might work out to allocate too little memory. - - return Math.max(maxOperatorAlloc, - optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP)); - } - - /** - * Per-node memory calculations based on a number of constraints. - *

- * Factored out into a separate method to allow unit testing. - * @param config Drill config - * @param optionManager system options - * @param directMemory amount of direct memory - * @return memory per query per node - */ - - @VisibleForTesting - public static long computeQueryMemory(DrillConfig config, OptionSet optionManager, long directMemory) { - - // Memory computed as a percent of total memory. - - long perQueryMemory = Math.round(directMemory * - optionManager.getOption(ExecConstants.PERCENT_MEMORY_PER_QUERY)); - - // But, must allow at least the amount given explicitly for - // backward compatibility. - - perQueryMemory = Math.max(perQueryMemory, - optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE)); - - // Compute again as either the total direct memory, or the - // configured maximum top-level allocation (10 GB). - - long maxAllocPerNode = Math.min(directMemory, - config.getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC)); - - // Final amount per node per query is the minimum of these two. - - maxAllocPerNode = Math.min(maxAllocPerNode, perQueryMemory); - return maxAllocPerNode; - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java index 9a853899a11..956ccf97325 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java @@ -132,7 +132,6 @@ private List getFragments(final DrillbitContext dContext, final Ge final QueryWorkUnit queryWorkUnit = parallelizer.generateWorkUnit(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), queryId, queryContext.getOnlineEndpointUUIDs(), rootFragment, queryContext.getSession(), queryContext.getQueryContextInfo()); -// planner.visitPhysicalPlan(queryWorkUnit); queryWorkUnit.applyPlan(dContext.getPlanReader()); fragments.add(queryWorkUnit.getRootFragment()); fragments.addAll(queryWorkUnit.getFragments()); diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java index 63ce995d1e5..c961a748f68 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java @@ -4493,18 +4493,18 @@ org.apache.drill.exec.proto.BitControl.CollectorOrBuilder getCollectorOrBuilder( int index); /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - boolean hasEndpointUUID(); + boolean hasAssignedEndpointUUID(); /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - java.lang.String getEndpointUUID(); + java.lang.String getAssignedEndpointUUID(); /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ com.google.protobuf.ByteString - getEndpointUUIDBytes(); + getAssignedEndpointUUIDBytes(); } /** * Protobuf type {@code exec.bit.control.PlanFragment} @@ -4529,7 +4529,7 @@ private PlanFragment() { memMax_ = 2000000000L; optionsJson_ = ""; collector_ = java.util.Collections.emptyList(); - endpointUUID_ = ""; + assignedEndpointUUID_ = ""; } @java.lang.Override @@ -4680,7 +4680,7 @@ private PlanFragment( case 146: { com.google.protobuf.ByteString bs = input.readBytes(); bitField0_ |= 0x00004000; - endpointUUID_ = bs; + assignedEndpointUUID_ = bs; break; } default: { @@ -5064,19 +5064,19 @@ public org.apache.drill.exec.proto.BitControl.CollectorOrBuilder getCollectorOrB return collector_.get(index); } - public static final int ENDPOINTUUID_FIELD_NUMBER = 18; - private volatile java.lang.Object endpointUUID_; + public static final int ASSIGNEDENDPOINTUUID_FIELD_NUMBER = 18; + private volatile java.lang.Object assignedEndpointUUID_; /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - public boolean hasEndpointUUID() { + public boolean hasAssignedEndpointUUID() { return ((bitField0_ & 0x00004000) == 0x00004000); } /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - public java.lang.String getEndpointUUID() { - java.lang.Object ref = endpointUUID_; + public java.lang.String getAssignedEndpointUUID() { + java.lang.Object ref = assignedEndpointUUID_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -5084,22 +5084,22 @@ public java.lang.String getEndpointUUID() { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - endpointUUID_ = s; + assignedEndpointUUID_ = s; } return s; } } /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ public com.google.protobuf.ByteString - getEndpointUUIDBytes() { - java.lang.Object ref = endpointUUID_; + getAssignedEndpointUUIDBytes() { + java.lang.Object ref = assignedEndpointUUID_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - endpointUUID_ = b; + assignedEndpointUUID_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -5166,7 +5166,7 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) output.writeMessage(17, collector_.get(i)); } if (((bitField0_ & 0x00004000) == 0x00004000)) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 18, endpointUUID_); + com.google.protobuf.GeneratedMessageV3.writeString(output, 18, assignedEndpointUUID_); } unknownFields.writeTo(output); } @@ -5236,7 +5236,7 @@ public int getSerializedSize() { .computeMessageSize(17, collector_.get(i)); } if (((bitField0_ & 0x00004000) == 0x00004000)) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(18, endpointUUID_); + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(18, assignedEndpointUUID_); } size += unknownFields.getSerializedSize(); memoizedSize = size; @@ -5334,10 +5334,10 @@ public boolean equals(final java.lang.Object obj) { } result = result && getCollectorList() .equals(other.getCollectorList()); - result = result && (hasEndpointUUID() == other.hasEndpointUUID()); - if (hasEndpointUUID()) { - result = result && getEndpointUUID() - .equals(other.getEndpointUUID()); + result = result && (hasAssignedEndpointUUID() == other.hasAssignedEndpointUUID()); + if (hasAssignedEndpointUUID()) { + result = result && getAssignedEndpointUUID() + .equals(other.getAssignedEndpointUUID()); } result = result && unknownFields.equals(other.unknownFields); return result; @@ -5417,9 +5417,9 @@ public int hashCode() { hash = (37 * hash) + COLLECTOR_FIELD_NUMBER; hash = (53 * hash) + getCollectorList().hashCode(); } - if (hasEndpointUUID()) { - hash = (37 * hash) + ENDPOINTUUID_FIELD_NUMBER; - hash = (53 * hash) + getEndpointUUID().hashCode(); + if (hasAssignedEndpointUUID()) { + hash = (37 * hash) + ASSIGNEDENDPOINTUUID_FIELD_NUMBER; + hash = (53 * hash) + getAssignedEndpointUUID().hashCode(); } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; @@ -5614,7 +5614,7 @@ public Builder clear() { } else { collectorBuilder_.clear(); } - endpointUUID_ = ""; + assignedEndpointUUID_ = ""; bitField0_ = (bitField0_ & ~0x00008000); return this; } @@ -5732,7 +5732,7 @@ public org.apache.drill.exec.proto.BitControl.PlanFragment buildPartial() { if (((from_bitField0_ & 0x00008000) == 0x00008000)) { to_bitField0_ |= 0x00004000; } - result.endpointUUID_ = endpointUUID_; + result.assignedEndpointUUID_ = assignedEndpointUUID_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5854,9 +5854,9 @@ public Builder mergeFrom(org.apache.drill.exec.proto.BitControl.PlanFragment oth } } } - if (other.hasEndpointUUID()) { + if (other.hasAssignedEndpointUUID()) { bitField0_ |= 0x00008000; - endpointUUID_ = other.endpointUUID_; + assignedEndpointUUID_ = other.assignedEndpointUUID_; onChanged(); } this.mergeUnknownFields(other.unknownFields); @@ -7127,24 +7127,24 @@ public org.apache.drill.exec.proto.BitControl.Collector.Builder addCollectorBuil return collectorBuilder_; } - private java.lang.Object endpointUUID_ = ""; + private java.lang.Object assignedEndpointUUID_ = ""; /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - public boolean hasEndpointUUID() { + public boolean hasAssignedEndpointUUID() { return ((bitField0_ & 0x00008000) == 0x00008000); } /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - public java.lang.String getEndpointUUID() { - java.lang.Object ref = endpointUUID_; + public java.lang.String getAssignedEndpointUUID() { + java.lang.Object ref = assignedEndpointUUID_; if (!(ref instanceof java.lang.String)) { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - endpointUUID_ = s; + assignedEndpointUUID_ = s; } return s; } else { @@ -7152,53 +7152,53 @@ public java.lang.String getEndpointUUID() { } } /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ public com.google.protobuf.ByteString - getEndpointUUIDBytes() { - java.lang.Object ref = endpointUUID_; + getAssignedEndpointUUIDBytes() { + java.lang.Object ref = assignedEndpointUUID_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - endpointUUID_ = b; + assignedEndpointUUID_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - public Builder setEndpointUUID( + public Builder setAssignedEndpointUUID( java.lang.String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00008000; - endpointUUID_ = value; + assignedEndpointUUID_ = value; onChanged(); return this; } /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - public Builder clearEndpointUUID() { + public Builder clearAssignedEndpointUUID() { bitField0_ = (bitField0_ & ~0x00008000); - endpointUUID_ = getDefaultInstance().getEndpointUUID(); + assignedEndpointUUID_ = getDefaultInstance().getAssignedEndpointUUID(); onChanged(); return this; } /** - * optional string endpointUUID = 18; + * optional string assignedEndpointUUID = 18; */ - public Builder setEndpointUUIDBytes( + public Builder setAssignedEndpointUUIDBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00008000; - endpointUUID_ = value; + assignedEndpointUUID_ = value; onChanged(); return this; } @@ -10872,7 +10872,7 @@ public org.apache.drill.exec.proto.BitControl.FinishedReceiver getDefaultInstanc "c.bit.FragmentHandle\"G\n\023InitializeFragme" + "nts\0220\n\010fragment\030\001 \003(\0132\036.exec.bit.control" + ".PlanFragment\".\n\rCustomMessage\022\014\n\004type\030\001" + - " \001(\005\022\017\n\007message\030\002 \001(\014\"\222\004\n\014PlanFragment\022(" + + " \001(\005\022\017\n\007message\030\002 \001(\014\"\232\004\n\014PlanFragment\022(" + "\n\006handle\030\001 \001(\0132\030.exec.bit.FragmentHandle" + "\022\024\n\014network_cost\030\004 \001(\002\022\020\n\010cpu_cost\030\005 \001(\002" + "\022\021\n\tdisk_cost\030\006 \001(\002\022\023\n\013memory_cost\030\007 \001(\002" + @@ -10885,30 +10885,30 @@ public org.apache.drill.exec.proto.BitControl.FinishedReceiver getDefaultInstanc "ls\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(" + "\0132).exec.bit.control.QueryContextInforma" + "tion\022.\n\tcollector\030\021 \003(\0132\033.exec.bit.contr" + - "ol.Collector\022\024\n\014endpointUUID\030\022 \001(\t\"\210\001\n\tC" + - "ollector\022\"\n\032opposite_major_fragment_id\030\001" + - " \001(\005\022#\n\027incoming_minor_fragment\030\002 \003(\005B\002\020" + - "\001\022\035\n\025supports_out_of_order\030\003 \001(\010\022\023\n\013is_s" + - "pooling\030\004 \001(\010\"w\n\027QueryContextInformation" + - "\022\030\n\020query_start_time\030\001 \001(\003\022\021\n\ttime_zone\030" + - "\002 \001(\005\022\033\n\023default_schema_name\030\003 \001(\t\022\022\n\nse" + - "ssion_id\030\004 \001(\t\"f\n\017WorkQueueStatus\022(\n\010end" + - "point\030\001 \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014q" + - "ueue_length\030\002 \001(\005\022\023\n\013report_time\030\003 \001(\003\"h" + - "\n\020FinishedReceiver\022*\n\010receiver\030\001 \001(\0132\030.e" + - "xec.bit.FragmentHandle\022(\n\006sender\030\002 \001(\0132\030" + - ".exec.bit.FragmentHandle*\206\003\n\007RpcType\022\r\n\t" + - "HANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034\n\030REQ" + - "_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANCEL_FR" + - "AGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007\022\027\n\023R" + - "EQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STATUS\020\t" + - "\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY_CANC" + - "EL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016\n\nREQ_CU" + - "STOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n\024RESP" + - "_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_STATUS\020\r\022" + - "\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_CUSTOM\020\022\022" + - "\020\n\014SASL_MESSAGE\020\023B+\n\033org.apache.drill.ex" + - "ec.protoB\nBitControlH\001" + "ol.Collector\022\034\n\024assignedEndpointUUID\030\022 \001" + + "(\t\"\210\001\n\tCollector\022\"\n\032opposite_major_fragm" + + "ent_id\030\001 \001(\005\022#\n\027incoming_minor_fragment\030" + + "\002 \003(\005B\002\020\001\022\035\n\025supports_out_of_order\030\003 \001(\010" + + "\022\023\n\013is_spooling\030\004 \001(\010\"w\n\027QueryContextInf" + + "ormation\022\030\n\020query_start_time\030\001 \001(\003\022\021\n\tti" + + "me_zone\030\002 \001(\005\022\033\n\023default_schema_name\030\003 \001" + + "(\t\022\022\n\nsession_id\030\004 \001(\t\"f\n\017WorkQueueStatu" + + "s\022(\n\010endpoint\030\001 \001(\0132\026.exec.DrillbitEndpo" + + "int\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_time" + + "\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030\001" + + " \001(\0132\030.exec.bit.FragmentHandle\022(\n\006sender" + + "\030\002 \001(\0132\030.exec.bit.FragmentHandle*\206\003\n\007Rpc" + + "Type\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020" + + "\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_C" + + "ANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHE" + + "D\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_" + + "STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QU" + + "ERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016" + + "\n\nREQ_CUSTOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013" + + "\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_S" + + "TATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_C" + + "USTOM\020\022\022\020\n\014SASL_MESSAGE\020\023B+\n\033org.apache." + + "drill.exec.protoB\nBitControlH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -10960,7 +10960,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_exec_bit_control_PlanFragment_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_exec_bit_control_PlanFragment_descriptor, - new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "LeafFragment", "Assignment", "Foreman", "MemInitial", "MemMax", "Credentials", "OptionsJson", "Context", "Collector", "EndpointUUID", }); + new java.lang.String[] { "Handle", "NetworkCost", "CpuCost", "DiskCost", "MemoryCost", "FragmentJson", "LeafFragment", "Assignment", "Foreman", "MemInitial", "MemMax", "Credentials", "OptionsJson", "Context", "Collector", "AssignedEndpointUUID", }); internal_static_exec_bit_control_Collector_descriptor = getDescriptor().getMessageTypes().get(6); internal_static_exec_bit_control_Collector_fieldAccessorTable = new diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java index 735549f1d5b..e4c67be98df 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java @@ -672,8 +672,8 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex for(org.apache.drill.exec.proto.BitControl.Collector collector : message.getCollectorList()) output.writeObject(17, collector, org.apache.drill.exec.proto.SchemaBitControl.Collector.WRITE, true); - if(message.hasEndpointUUID()) - output.writeString(18, message.getEndpointUUID(), false); + if(message.hasAssignedEndpointUUID()) + output.writeString(18, message.getAssignedEndpointUUID(), false); } public boolean isInitialized(org.apache.drill.exec.proto.BitControl.PlanFragment message) { @@ -765,7 +765,7 @@ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.ex break; case 18: - builder.setEndpointUUID(input.readString()); + builder.setAssignedEndpointUUID(input.readString()); break; default: input.handleUnknownField(number, this); @@ -822,7 +822,7 @@ public static java.lang.String getFieldName(int number) case 15: return "optionsJson"; case 16: return "context"; case 17: return "collector"; - case 18: return "endpointUUID"; + case 18: return "assignedEndpointUUID"; default: return null; } } @@ -849,7 +849,7 @@ public static int getFieldNumber(java.lang.String name) fieldMap.put("optionsJson", 15); fieldMap.put("context", 16); fieldMap.put("collector", 17); - fieldMap.put("endpointUUID", 18); + fieldMap.put("assignedEndpointUUID", 18); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java index 4babfb18148..e9c30ac2390 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java @@ -2377,8 +2377,8 @@ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.ex if(message.hasWaitNanos()) output.writeInt64(9, message.getWaitNanos(), false); - if(message.hasOptimalMemAllocation()) - output.writeInt64(10, message.getOptimalMemAllocation(), false); + if(message.hasMaxAllocation()) + output.writeInt64(10, message.getMaxAllocation(), false); } public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.OperatorProfile message) { @@ -2445,7 +2445,7 @@ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.ex builder.setWaitNanos(input.readInt64()); break; case 10: - builder.setOptimalMemAllocation(input.readInt64()); + builder.setMaxAllocation(input.readInt64()); break; default: input.handleUnknownField(number, this); @@ -2495,7 +2495,7 @@ public static java.lang.String getFieldName(int number) case 7: return "peakLocalMemoryAllocated"; case 8: return "metric"; case 9: return "waitNanos"; - case 10: return "optimalMemAllocation"; + case 10: return "maxAllocation"; default: return null; } } @@ -2515,7 +2515,7 @@ public static int getFieldNumber(java.lang.String name) fieldMap.put("peakLocalMemoryAllocated", 7); fieldMap.put("metric", 8); fieldMap.put("waitNanos", 9); - fieldMap.put("optimalMemAllocation", 10); + fieldMap.put("maxAllocation", 10); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 452d90e3bda..e5fe2539f14 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -22365,13 +22365,13 @@ org.apache.drill.exec.proto.UserBitShared.MetricValueOrBuilder getMetricOrBuilde long getWaitNanos(); /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - boolean hasOptimalMemAllocation(); + boolean hasMaxAllocation(); /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - long getOptimalMemAllocation(); + long getMaxAllocation(); } /** * Protobuf type {@code exec.shared.OperatorProfile} @@ -22394,7 +22394,7 @@ private OperatorProfile() { peakLocalMemoryAllocated_ = 0L; metric_ = java.util.Collections.emptyList(); waitNanos_ = 0L; - optimalMemAllocation_ = 0L; + maxAllocation_ = 0L; } @java.lang.Override @@ -22471,7 +22471,7 @@ private OperatorProfile( } case 80: { bitField0_ |= 0x00000040; - optimalMemAllocation_ = input.readInt64(); + maxAllocation_ = input.readInt64(); break; } default: { @@ -22673,19 +22673,19 @@ public long getWaitNanos() { return waitNanos_; } - public static final int OPTIMAL_MEM_ALLOCATION_FIELD_NUMBER = 10; - private long optimalMemAllocation_; + public static final int MAX_ALLOCATION_FIELD_NUMBER = 10; + private long maxAllocation_; /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - public boolean hasOptimalMemAllocation() { + public boolean hasMaxAllocation() { return ((bitField0_ & 0x00000040) == 0x00000040); } /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - public long getOptimalMemAllocation() { - return optimalMemAllocation_; + public long getMaxAllocation() { + return maxAllocation_; } private byte memoizedIsInitialized = -1; @@ -22727,7 +22727,7 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) output.writeInt64(9, waitNanos_); } if (((bitField0_ & 0x00000040) == 0x00000040)) { - output.writeInt64(10, optimalMemAllocation_); + output.writeInt64(10, maxAllocation_); } unknownFields.writeTo(output); } @@ -22772,7 +22772,7 @@ public int getSerializedSize() { } if (((bitField0_ & 0x00000040) == 0x00000040)) { size += com.google.protobuf.CodedOutputStream - .computeInt64Size(10, optimalMemAllocation_); + .computeInt64Size(10, maxAllocation_); } size += unknownFields.getSerializedSize(); memoizedSize = size; @@ -22824,10 +22824,10 @@ public boolean equals(final java.lang.Object obj) { result = result && (getWaitNanos() == other.getWaitNanos()); } - result = result && (hasOptimalMemAllocation() == other.hasOptimalMemAllocation()); - if (hasOptimalMemAllocation()) { - result = result && (getOptimalMemAllocation() - == other.getOptimalMemAllocation()); + result = result && (hasMaxAllocation() == other.hasMaxAllocation()); + if (hasMaxAllocation()) { + result = result && (getMaxAllocation() + == other.getMaxAllocation()); } result = result && unknownFields.equals(other.unknownFields); return result; @@ -22876,10 +22876,10 @@ public int hashCode() { hash = (53 * hash) + com.google.protobuf.Internal.hashLong( getWaitNanos()); } - if (hasOptimalMemAllocation()) { - hash = (37 * hash) + OPTIMAL_MEM_ALLOCATION_FIELD_NUMBER; + if (hasMaxAllocation()) { + hash = (37 * hash) + MAX_ALLOCATION_FIELD_NUMBER; hash = (53 * hash) + com.google.protobuf.Internal.hashLong( - getOptimalMemAllocation()); + getMaxAllocation()); } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; @@ -23040,7 +23040,7 @@ public Builder clear() { } waitNanos_ = 0L; bitField0_ = (bitField0_ & ~0x00000080); - optimalMemAllocation_ = 0L; + maxAllocation_ = 0L; bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -23115,7 +23115,7 @@ public org.apache.drill.exec.proto.UserBitShared.OperatorProfile buildPartial() if (((from_bitField0_ & 0x00000100) == 0x00000100)) { to_bitField0_ |= 0x00000040; } - result.optimalMemAllocation_ = optimalMemAllocation_; + result.maxAllocation_ = maxAllocation_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -23235,8 +23235,8 @@ public Builder mergeFrom(org.apache.drill.exec.proto.UserBitShared.OperatorProfi if (other.hasWaitNanos()) { setWaitNanos(other.getWaitNanos()); } - if (other.hasOptimalMemAllocation()) { - setOptimalMemAllocation(other.getOptimalMemAllocation()); + if (other.hasMaxAllocation()) { + setMaxAllocation(other.getMaxAllocation()); } this.mergeUnknownFields(other.unknownFields); onChanged(); @@ -23940,34 +23940,34 @@ public Builder clearWaitNanos() { return this; } - private long optimalMemAllocation_ ; + private long maxAllocation_ ; /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - public boolean hasOptimalMemAllocation() { + public boolean hasMaxAllocation() { return ((bitField0_ & 0x00000100) == 0x00000100); } /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - public long getOptimalMemAllocation() { - return optimalMemAllocation_; + public long getMaxAllocation() { + return maxAllocation_; } /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - public Builder setOptimalMemAllocation(long value) { + public Builder setMaxAllocation(long value) { bitField0_ |= 0x00000100; - optimalMemAllocation_ = value; + maxAllocation_ = value; onChanged(); return this; } /** - * optional int64 optimal_mem_allocation = 10; + * optional int64 max_allocation = 10; */ - public Builder clearOptimalMemAllocation() { + public Builder clearMaxAllocation() { bitField0_ = (bitField0_ & ~0x00000100); - optimalMemAllocation_ = 0L; + maxAllocation_ = 0L; onChanged(); return this; } @@ -27946,65 +27946,65 @@ public org.apache.drill.exec.proto.UserBitShared.SaslMessage getDefaultInstanceF "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" + "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" + "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 " + - "\001(\003\"\237\002\n\017OperatorProfile\0221\n\rinput_profile" + + "\001(\003\"\227\002\n\017OperatorProfile\0221\n\rinput_profile" + "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" + "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" + "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" + "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" + "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" + - "e\022\022\n\nwait_nanos\030\t \001(\003\022\036\n\026optimal_mem_all" + - "ocation\030\n \001(\003\"B\n\rStreamProfile\022\017\n\007record" + - "s\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(" + - "\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nl" + - "ong_value\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n" + - "\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar" + - "\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_signat" + - "ure\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 " + - "\001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec" + - ".shared.SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_" + - "CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQue" + - "ryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL" + - "\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020" + - "\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAI" + - "TING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISH" + - "ED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCE" + - "LLATION_REQUESTED\020\006*\374\t\n\020CoreOperatorType" + - "\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020" + - "\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHAS" + - "H_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITI" + - "ON_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIV" + - "ER\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PR" + - "OJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\032\n\026RANGE" + - "_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELEC" + - "TION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGRE" + - "GATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020" + - "\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n" + - "\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SC" + - "AN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_" + - "SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB" + - "_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCA" + - "N\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SU" + - "B_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCE" + - "R_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WIND" + - "OW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_S" + - "CAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SC" + - "AN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014" + - "LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL" + - "_NATIVE_PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC" + - "_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SU" + - "B_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRI" + - "TER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WR" + - "ITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_" + - "SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PAR" + - "TITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016R" + - "UNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSL" + - "OG_SUB_SCAN\020:\022\030\n\024STATISTICS_AGGREGATE\020;\022" + - "\020\n\014UNPIVOT_MAPS\020<\022\024\n\020STATISTICS_MERGE\020=\022" + - "\021\n\rLTSV_SUB_SCAN\020>*g\n\nSaslStatus\022\020\n\014SASL" + - "_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PR" + - "OGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILE" + - "D\020\004B.\n\033org.apache.drill.exec.protoB\rUser" + - "BitSharedH\001" + "e\022\022\n\nwait_nanos\030\t \001(\003\022\026\n\016max_allocation\030" + + "\n \001(\003\"B\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022" + + "\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013Met" + + "ricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_valu" + + "e\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n\010Registr" + + "y\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar\"/\n\003Jar\022" + + "\014\n\004name\030\001 \001(\t\022\032\n\022function_signature\030\002 \003(" + + "\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004d" + + "ata\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec.shared." + + "SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020" + + "\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007" + + "\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEX" + + "ECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207\001\n\rFr" + + "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" + + "OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t" + + "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" + + "REQUESTED\020\006*\374\t\n\020CoreOperatorType\022\021\n\rSING" + + "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" + + "TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" + + "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" + + "R\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030" + + "ORDERED_PARTITION_SENDER\020\t\022\013\n\007PROJECT\020\n\022" + + "\026\n\022UNORDERED_RECEIVER\020\013\022\032\n\026RANGE_PARTITI" + + "ON_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VEC" + + "TOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016" + + "\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRA" + + "CE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET" + + "_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021" + + "SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022" + + "\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022" + + "\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJ" + + "SON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036" + + "\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUM" + + "ER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020" + + "NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n" + + "\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\r" + + "KUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014LATERAL_" + + "JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL_NATIVE_" + + "PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC_SCAN\020,\022" + + "\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SUB_SCAN\020." + + "\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRITER\0200\022\026\n" + + "\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRITER\0202\022\026" + + "\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_SUB_SCAN" + + "\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PARTITION_L" + + "IMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RUNTIME_F" + + "ILTER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSLOG_SUB_S" + + "CAN\020:\022\030\n\024STATISTICS_AGGREGATE\020;\022\020\n\014UNPIV" + + "OT_MAPS\020<\022\024\n\020STATISTICS_MERGE\020=\022\021\n\rLTSV_" + + "SUB_SCAN\020>*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN" + + "\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002" + + "\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033o" + + "rg.apache.drill.exec.protoB\rUserBitShare" + + "dH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -28122,7 +28122,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_exec_shared_OperatorProfile_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_exec_shared_OperatorProfile_descriptor, - new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "PeakLocalMemoryAllocated", "Metric", "WaitNanos", "OptimalMemAllocation", }); + new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "PeakLocalMemoryAllocated", "Metric", "WaitNanos", "MaxAllocation", }); internal_static_exec_shared_StreamProfile_descriptor = getDescriptor().getMessageTypes().get(17); internal_static_exec_shared_StreamProfile_fieldAccessorTable = new diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java index 224214a69dd..a065a6cb8e0 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java @@ -57,7 +57,7 @@ public static OperatorProfile getDefaultInstance() private long peakLocalMemoryAllocated; private List metric; private long waitNanos; - private long optimalMemAllocation; + private long maxAllocation; public OperatorProfile() { @@ -170,16 +170,16 @@ public OperatorProfile setWaitNanos(long waitNanos) return this; } - // optimalMemAllocation + // maxAllocation - public long getOptimalMemAllocation() + public long getMaxAllocation() { - return optimalMemAllocation; + return maxAllocation; } - public OperatorProfile setOptimalMemAllocation(long optimalMemAllocation) + public OperatorProfile setMaxAllocation(long maxAllocation) { - this.optimalMemAllocation = optimalMemAllocation; + this.maxAllocation = maxAllocation; return this; } @@ -268,7 +268,7 @@ public void mergeFrom(Input input, OperatorProfile message) throws IOException message.waitNanos = input.readInt64(); break; case 10: - message.optimalMemAllocation = input.readInt64(); + message.maxAllocation = input.readInt64(); break; default: input.handleUnknownField(number, this); @@ -317,8 +317,8 @@ public void writeTo(Output output, OperatorProfile message) throws IOException if(message.waitNanos != 0) output.writeInt64(9, message.waitNanos, false); - if(message.optimalMemAllocation != 0) - output.writeInt64(10, message.optimalMemAllocation, false); + if(message.maxAllocation != 0) + output.writeInt64(10, message.maxAllocation, false); } public String getFieldName(int number) @@ -333,7 +333,7 @@ public String getFieldName(int number) case 7: return "peakLocalMemoryAllocated"; case 8: return "metric"; case 9: return "waitNanos"; - case 10: return "optimalMemAllocation"; + case 10: return "maxAllocation"; default: return null; } } @@ -355,7 +355,7 @@ public int getFieldNumber(String name) __fieldMap.put("peakLocalMemoryAllocated", 7); __fieldMap.put("metric", 8); __fieldMap.put("waitNanos", 9); - __fieldMap.put("optimalMemAllocation", 10); + __fieldMap.put("maxAllocation", 10); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/PlanFragment.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/PlanFragment.java index 7214f15882a..9afc892a2ad 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/PlanFragment.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/PlanFragment.java @@ -66,7 +66,7 @@ public static PlanFragment getDefaultInstance() private String optionsJson; private QueryContextInformation context; private List collector; - private String endpointUUID; + private String assignedEndpointUUID; public PlanFragment() { @@ -270,16 +270,16 @@ public PlanFragment setCollectorList(List collector) return this; } - // endpointUUID + // assignedEndpointUUID - public String getEndpointUUID() + public String getAssignedEndpointUUID() { - return endpointUUID; + return assignedEndpointUUID; } - public PlanFragment setEndpointUUID(String endpointUUID) + public PlanFragment setAssignedEndpointUUID(String assignedEndpointUUID) { - this.endpointUUID = endpointUUID; + this.assignedEndpointUUID = assignedEndpointUUID; return this; } @@ -391,7 +391,7 @@ public void mergeFrom(Input input, PlanFragment message) throws IOException break; case 18: - message.endpointUUID = input.readString(); + message.assignedEndpointUUID = input.readString(); break; default: input.handleUnknownField(number, this); @@ -459,8 +459,8 @@ public void writeTo(Output output, PlanFragment message) throws IOException } - if(message.endpointUUID != null) - output.writeString(18, message.endpointUUID, false); + if(message.assignedEndpointUUID != null) + output.writeString(18, message.assignedEndpointUUID, false); } public String getFieldName(int number) @@ -482,7 +482,7 @@ public String getFieldName(int number) case 15: return "optionsJson"; case 16: return "context"; case 17: return "collector"; - case 18: return "endpointUUID"; + case 18: return "assignedEndpointUUID"; default: return null; } } @@ -511,7 +511,7 @@ public int getFieldNumber(String name) __fieldMap.put("optionsJson", 15); __fieldMap.put("context", 16); __fieldMap.put("collector", 17); - __fieldMap.put("endpointUUID", 18); + __fieldMap.put("assignedEndpointUUID", 18); } } diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto index a30b0606017..ae536bce343 100644 --- a/protocol/src/main/protobuf/BitControl.proto +++ b/protocol/src/main/protobuf/BitControl.proto @@ -79,7 +79,7 @@ message PlanFragment { optional string options_json = 15; optional QueryContextInformation context = 16; repeated Collector collector = 17; - optional string endpointUUID = 18; + optional string assignedEndpointUUID = 18; } message Collector { diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 46d28eb4146..090b4cd2e41 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -268,7 +268,7 @@ message OperatorProfile { optional int64 peak_local_memory_allocated = 7; repeated MetricValue metric = 8; optional int64 wait_nanos = 9; - optional int64 optimal_mem_allocation = 10; + optional int64 max_allocation = 10; } message StreamProfile {