1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.graph;
19
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.io.Writable;
22 import org.apache.hadoop.io.WritableComparable;
23 import org.apache.giraph.partition.PartitionOwner;
24 import com.google.common.collect.Maps;
25 import java.util.Map;
26
27
28 /** Utility class to manage data transfers from
29 * a local worker reading InputSplits.
30 * Currently, this class measures # of vertices and edges
31 * per outgoing Collection of graph data (destined for a
32 * particular Partition and remote worker node, preselected
33 * by the master.)
34 *
35 * TODO: implement defaults and configurable options for
36 * measuring the size of input <V> or <E> data
37 * per read vertex, and setting limits on totals per outgoing
38 * graph data Collection etc. (See GIRAPH-260)
39 */
40 public class GiraphTransferRegulator {
41 /** Maximum vertices to read from an InputSplit locally that are
42 * to be routed to a remote worker, before sending them. */
43 public static final String MAX_VERTICES_PER_TRANSFER =
44 "giraph.maxVerticesPerTransfer";
45 /** Default maximum number of vertices per
46 * temp partition before sending. */
47 public static final int MAX_VERTICES_PER_TRANSFER_DEFAULT = 10000;
48 /**
49 * Maximum edges to read from an InputSplit locally that are
50 * to be routed to a remote worker, before sending them.
51 */
52 public static final String MAX_EDGES_PER_TRANSFER =
53 "giraph.maxEdgesPerTransfer";
54 /** Default maximum number of vertices per
55 * temp partition before sending. */
56 public static final int MAX_EDGES_PER_TRANSFER_DEFAULT = 80000;
57
58 /** Internal state to measure when
59 * the next data transfer of a Collection
60 * of vertices read by the local worker that
61 * owns this regulator is ready to be sent
62 * to the remote worker node that the master
63 * has assigned the vertices to */
64 private Map<Integer, Integer> edgeAccumulator;
65
66 /** Internal state to measure when
67 * the next data transfer of a Collection
68 * of vertices read by the local worker that
69 * owns this regulator is ready to be sent
70 * to the remote worker node that the master
71 * has assigned the vertices to */
72 private Map<Integer, Integer> vertexAccumulator;
73
74 /** Number of vertices per data transfer */
75 private final int maxVerticesPerTransfer;
76
77 /** Number of edges per data transfer */
78 private final int maxEdgesPerTransfer;
79
80 /** Vertex count total for this InputSplit */
81 private long totalVertexCount;
82
83 /** Edge count total for this InputSplit */
84 private long totalEdgeCount;
85
86 /** Default constructor
87 * @param conf the Configuration for this job
88 */
89 public GiraphTransferRegulator(Configuration conf) {
90 vertexAccumulator = Maps.<Integer, Integer>newHashMap();
91 edgeAccumulator = Maps.<Integer, Integer>newHashMap();
92 maxVerticesPerTransfer = conf.getInt(
93 MAX_VERTICES_PER_TRANSFER,
94 MAX_VERTICES_PER_TRANSFER_DEFAULT);
95 maxEdgesPerTransfer = conf.getInt(
96 MAX_EDGES_PER_TRANSFER,
97 MAX_EDGES_PER_TRANSFER_DEFAULT);
98 totalEdgeCount = 0;
99 totalVertexCount = 0;
100 }
101
102 /** Is this outbound data Collection full,
103 * and ready to transfer?
104 * @param owner the partition owner for the outbound data
105 * @return 'true' if the temp partition data is ready to transfer
106 */
107 public boolean transferThisPartition(PartitionOwner owner) {
108 final int partitionId = owner.getPartitionId();
109 if (getEdgesForPartition(partitionId) >=
110 maxEdgesPerTransfer ||
111 getVerticesForPartition(partitionId) >=
112 maxVerticesPerTransfer) {
113 vertexAccumulator.put(partitionId, 0);
114 edgeAccumulator.put(partitionId, 0);
115 return true;
116 }
117 return false;
118 }
119
120 /** get current vertex count for a given Collection of
121 * data soon to be transfered to its permanent home.
122 * @param partId the partition id to check the count on.
123 * @return the count of vertices.
124 */
125 private int getVerticesForPartition(final int partId) {
126 return vertexAccumulator.get(partId) == null ?
127 0 : vertexAccumulator.get(partId);
128 }
129
130 /** get current edge count for a given Collection of
131 * data soon to be transfered to its permanent home.
132 * @param partId the partition id to check the count on.
133 * @return the count of edges.
134 */
135 private int getEdgesForPartition(final int partId) {
136 return edgeAccumulator.get(partId) == null ?
137 0 : edgeAccumulator.get(partId);
138 }
139
140 /** Clear storage to reset for reading new InputSplit */
141 public void clearCounters() {
142 totalEdgeCount = 0;
143 totalVertexCount = 0;
144 vertexAccumulator.clear();
145 edgeAccumulator.clear();
146 }
147
148 /** Increment V & E counts for new vertex read, store values
149 * for that outgoing _temporary_ Partition, which shares the
150 * Partition ID for the actual remote Partition the collection
151 * will eventually be processed in.
152 * @param partitionOwner the owner of the Partition this data
153 * will eventually belong to.
154 * @param vertex the vertex to extract counts from.
155 * @param <I> the vertex id type.
156 * @param <V> the vertex value type.
157 * @param <E> the edge value type.
158 */
159 public <I extends WritableComparable, V extends Writable,
160 E extends Writable> void
161 incrementCounters(PartitionOwner partitionOwner,
162 Vertex<I, V, E> vertex) {
163 final int id = partitionOwner.getPartitionId();
164 // vertex counts
165 vertexAccumulator
166 .put(id, getVerticesForPartition(id) + 1);
167 totalVertexCount++;
168 // edge counts
169 totalEdgeCount += vertex.getNumEdges();
170 edgeAccumulator.put(id, getEdgesForPartition(id) +
171 vertex.getNumEdges());
172 }
173
174 /** Getter for MAX edge count to initiate a transfer
175 * @return max edge count per transfer */
176 public long getMaxEdgesPerTransfer() {
177 return maxEdgesPerTransfer;
178 }
179
180 /** Getter for MAX vertex count to initiate a transfer
181 * @return max edge count per transfer */
182 public long getMaxVerticesPerTransfer() {
183 return maxVerticesPerTransfer;
184 }
185
186 /** Getter for total edge count for the current InputSplit
187 * @return the # of total edges counted in this InputSplit */
188 public long getTotalEdges() {
189 return totalEdgeCount;
190 }
191
192 /** Getter for total vetex count for the current InputSplit
193 * @return the total # of vertices counted in this InputSplit */
194 public long getTotalVertices() {
195 return totalVertexCount;
196 }
197 }
198