1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.giraph.graph; 19 20 import org.apache.hadoop.conf.Configuration; 21 import org.apache.hadoop.io.Writable; 22 import org.apache.hadoop.io.WritableComparable; 23 import org.apache.giraph.partition.PartitionOwner; 24 import com.google.common.collect.Maps; 25 import java.util.Map; 26 27 28 /** Utility class to manage data transfers from 29 * a local worker reading InputSplits. 30 * Currently, this class measures # of vertices and edges 31 * per outgoing Collection of graph data (destined for a 32 * particular Partition and remote worker node, preselected 33 * by the master.) 34 * 35 * TODO: implement defaults and configurable options for 36 * measuring the size of input <V> or <E> data 37 * per read vertex, and setting limits on totals per outgoing 38 * graph data Collection etc. (See GIRAPH-260) 39 */ 40 public class GiraphTransferRegulator { 41 /** Maximum vertices to read from an InputSplit locally that are 42 * to be routed to a remote worker, before sending them. */ 43 public static final String MAX_VERTICES_PER_TRANSFER = 44 "giraph.maxVerticesPerTransfer"; 45 /** Default maximum number of vertices per 46 * temp partition before sending. */ 47 public static final int MAX_VERTICES_PER_TRANSFER_DEFAULT = 10000; 48 /** 49 * Maximum edges to read from an InputSplit locally that are 50 * to be routed to a remote worker, before sending them. 51 */ 52 public static final String MAX_EDGES_PER_TRANSFER = 53 "giraph.maxEdgesPerTransfer"; 54 /** Default maximum number of vertices per 55 * temp partition before sending. */ 56 public static final int MAX_EDGES_PER_TRANSFER_DEFAULT = 80000; 57 58 /** Internal state to measure when 59 * the next data transfer of a Collection 60 * of vertices read by the local worker that 61 * owns this regulator is ready to be sent 62 * to the remote worker node that the master 63 * has assigned the vertices to */ 64 private Map<Integer, Integer> edgeAccumulator; 65 66 /** Internal state to measure when 67 * the next data transfer of a Collection 68 * of vertices read by the local worker that 69 * owns this regulator is ready to be sent 70 * to the remote worker node that the master 71 * has assigned the vertices to */ 72 private Map<Integer, Integer> vertexAccumulator; 73 74 /** Number of vertices per data transfer */ 75 private final int maxVerticesPerTransfer; 76 77 /** Number of edges per data transfer */ 78 private final int maxEdgesPerTransfer; 79 80 /** Vertex count total for this InputSplit */ 81 private long totalVertexCount; 82 83 /** Edge count total for this InputSplit */ 84 private long totalEdgeCount; 85 86 /** Default constructor 87 * @param conf the Configuration for this job 88 */ 89 public GiraphTransferRegulator(Configuration conf) { 90 vertexAccumulator = Maps.<Integer, Integer>newHashMap(); 91 edgeAccumulator = Maps.<Integer, Integer>newHashMap(); 92 maxVerticesPerTransfer = conf.getInt( 93 MAX_VERTICES_PER_TRANSFER, 94 MAX_VERTICES_PER_TRANSFER_DEFAULT); 95 maxEdgesPerTransfer = conf.getInt( 96 MAX_EDGES_PER_TRANSFER, 97 MAX_EDGES_PER_TRANSFER_DEFAULT); 98 totalEdgeCount = 0; 99 totalVertexCount = 0; 100 } 101 102 /** Is this outbound data Collection full, 103 * and ready to transfer? 104 * @param owner the partition owner for the outbound data 105 * @return 'true' if the temp partition data is ready to transfer 106 */ 107 public boolean transferThisPartition(PartitionOwner owner) { 108 final int partitionId = owner.getPartitionId(); 109 if (getEdgesForPartition(partitionId) >= 110 maxEdgesPerTransfer || 111 getVerticesForPartition(partitionId) >= 112 maxVerticesPerTransfer) { 113 vertexAccumulator.put(partitionId, 0); 114 edgeAccumulator.put(partitionId, 0); 115 return true; 116 } 117 return false; 118 } 119 120 /** get current vertex count for a given Collection of 121 * data soon to be transfered to its permanent home. 122 * @param partId the partition id to check the count on. 123 * @return the count of vertices. 124 */ 125 private int getVerticesForPartition(final int partId) { 126 return vertexAccumulator.get(partId) == null ? 127 0 : vertexAccumulator.get(partId); 128 } 129 130 /** get current edge count for a given Collection of 131 * data soon to be transfered to its permanent home. 132 * @param partId the partition id to check the count on. 133 * @return the count of edges. 134 */ 135 private int getEdgesForPartition(final int partId) { 136 return edgeAccumulator.get(partId) == null ? 137 0 : edgeAccumulator.get(partId); 138 } 139 140 /** Clear storage to reset for reading new InputSplit */ 141 public void clearCounters() { 142 totalEdgeCount = 0; 143 totalVertexCount = 0; 144 vertexAccumulator.clear(); 145 edgeAccumulator.clear(); 146 } 147 148 /** Increment V & E counts for new vertex read, store values 149 * for that outgoing _temporary_ Partition, which shares the 150 * Partition ID for the actual remote Partition the collection 151 * will eventually be processed in. 152 * @param partitionOwner the owner of the Partition this data 153 * will eventually belong to. 154 * @param vertex the vertex to extract counts from. 155 * @param <I> the vertex id type. 156 * @param <V> the vertex value type. 157 * @param <E> the edge value type. 158 */ 159 public <I extends WritableComparable, V extends Writable, 160 E extends Writable> void 161 incrementCounters(PartitionOwner partitionOwner, 162 Vertex<I, V, E> vertex) { 163 final int id = partitionOwner.getPartitionId(); 164 // vertex counts 165 vertexAccumulator 166 .put(id, getVerticesForPartition(id) + 1); 167 totalVertexCount++; 168 // edge counts 169 totalEdgeCount += vertex.getNumEdges(); 170 edgeAccumulator.put(id, getEdgesForPartition(id) + 171 vertex.getNumEdges()); 172 } 173 174 /** Getter for MAX edge count to initiate a transfer 175 * @return max edge count per transfer */ 176 public long getMaxEdgesPerTransfer() { 177 return maxEdgesPerTransfer; 178 } 179 180 /** Getter for MAX vertex count to initiate a transfer 181 * @return max edge count per transfer */ 182 public long getMaxVerticesPerTransfer() { 183 return maxVerticesPerTransfer; 184 } 185 186 /** Getter for total edge count for the current InputSplit 187 * @return the # of total edges counted in this InputSplit */ 188 public long getTotalEdges() { 189 return totalEdgeCount; 190 } 191 192 /** Getter for total vetex count for the current InputSplit 193 * @return the total # of vertices counted in this InputSplit */ 194 public long getTotalVertices() { 195 return totalVertexCount; 196 } 197 } 198