This project has retired. For details please refer to its Attic page.
GiraphTransferRegulator xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.graph;
19  
20  import org.apache.hadoop.conf.Configuration;
21  import org.apache.hadoop.io.Writable;
22  import org.apache.hadoop.io.WritableComparable;
23  import org.apache.giraph.partition.PartitionOwner;
24  import com.google.common.collect.Maps;
25  import java.util.Map;
26  
27  
28  /** Utility class to manage data transfers from
29   * a local worker reading InputSplits.
30   * Currently, this class measures # of vertices and edges
31   * per outgoing Collection of graph data (destined for a
32   * particular Partition and remote worker node, preselected
33   * by the master.)
34   *
35   * TODO: implement defaults and configurable options for
36   * measuring the size of input <V> or <E> data
37   * per read vertex, and setting limits on totals per outgoing
38   * graph data Collection etc. (See GIRAPH-260)
39   */
40  public class GiraphTransferRegulator {
41    /** Maximum vertices to read from an InputSplit locally that are
42     * to be routed to a remote worker, before sending them. */
43    public static final String MAX_VERTICES_PER_TRANSFER =
44      "giraph.maxVerticesPerTransfer";
45    /** Default maximum number of vertices per
46     * temp partition before sending. */
47    public static final int MAX_VERTICES_PER_TRANSFER_DEFAULT = 10000;
48    /**
49     * Maximum edges to read from an InputSplit locally that are
50     * to be routed to a remote worker, before sending them.
51     */
52    public static final String MAX_EDGES_PER_TRANSFER =
53      "giraph.maxEdgesPerTransfer";
54    /** Default maximum number of vertices per
55     * temp partition before sending. */
56    public static final int MAX_EDGES_PER_TRANSFER_DEFAULT = 80000;
57  
58    /** Internal state to measure when
59     * the next data transfer of a Collection
60     * of vertices read by the local worker that
61     * owns this regulator is ready to be sent
62     * to the remote worker node that the master
63     * has assigned the vertices to */
64    private Map<Integer, Integer> edgeAccumulator;
65  
66    /** Internal state to measure when
67     * the next data transfer of a Collection
68     * of vertices read by the local worker that
69     * owns this regulator is ready to be sent
70     * to the remote worker node that the master
71     * has assigned the vertices to */
72    private Map<Integer, Integer> vertexAccumulator;
73  
74    /** Number of vertices per data transfer */
75    private final int maxVerticesPerTransfer;
76  
77    /** Number of edges per data transfer */
78    private final int maxEdgesPerTransfer;
79  
80    /** Vertex count total for this InputSplit */
81    private long totalVertexCount;
82  
83    /** Edge count total for this InputSplit */
84    private long totalEdgeCount;
85  
86    /** Default constructor
87     * @param conf the Configuration for this job
88     */
89    public GiraphTransferRegulator(Configuration conf) {
90      vertexAccumulator = Maps.<Integer, Integer>newHashMap();
91      edgeAccumulator = Maps.<Integer, Integer>newHashMap();
92      maxVerticesPerTransfer = conf.getInt(
93          MAX_VERTICES_PER_TRANSFER,
94          MAX_VERTICES_PER_TRANSFER_DEFAULT);
95      maxEdgesPerTransfer = conf.getInt(
96          MAX_EDGES_PER_TRANSFER,
97          MAX_EDGES_PER_TRANSFER_DEFAULT);
98      totalEdgeCount = 0;
99      totalVertexCount = 0;
100   }
101 
102   /** Is this outbound data Collection full,
103    * and ready to transfer?
104    * @param owner the partition owner for the outbound data
105    * @return 'true' if the temp partition data is ready to transfer
106    */
107   public boolean transferThisPartition(PartitionOwner owner) {
108     final int partitionId = owner.getPartitionId();
109     if (getEdgesForPartition(partitionId) >=
110       maxEdgesPerTransfer ||
111       getVerticesForPartition(partitionId) >=
112       maxVerticesPerTransfer) {
113       vertexAccumulator.put(partitionId, 0);
114       edgeAccumulator.put(partitionId, 0);
115       return true;
116     }
117     return false;
118   }
119 
120   /** get current vertex count for a given Collection of
121    * data soon to be transfered to its permanent home.
122    * @param partId the partition id to check the count on.
123    * @return the count of vertices.
124    */
125   private int getVerticesForPartition(final int partId) {
126     return vertexAccumulator.get(partId) == null ?
127       0 : vertexAccumulator.get(partId);
128   }
129 
130   /** get current edge count for a given Collection of
131    * data soon to be transfered to its permanent home.
132    * @param partId the partition id to check the count on.
133    * @return the count of edges.
134    */
135   private int getEdgesForPartition(final int partId) {
136     return edgeAccumulator.get(partId) == null ?
137       0 : edgeAccumulator.get(partId);
138   }
139 
140   /** Clear storage to reset for reading new InputSplit */
141   public void clearCounters() {
142     totalEdgeCount = 0;
143     totalVertexCount = 0;
144     vertexAccumulator.clear();
145     edgeAccumulator.clear();
146   }
147 
148   /** Increment V &amp; E counts for new vertex read, store values
149    * for that outgoing _temporary_ Partition, which shares the
150    * Partition ID for the actual remote Partition the collection
151    * will eventually be processed in.
152    * @param partitionOwner the owner of the Partition this data
153    *  will eventually belong to.
154    * @param vertex the vertex to extract counts from.
155    * @param <I> the vertex id type.
156    * @param <V> the vertex value type.
157    * @param <E> the edge value type.
158    */
159   public <I extends WritableComparable, V extends Writable,
160   E extends Writable> void
161   incrementCounters(PartitionOwner partitionOwner,
162     Vertex<I, V, E> vertex) {
163     final int id = partitionOwner.getPartitionId();
164     // vertex counts
165     vertexAccumulator
166       .put(id, getVerticesForPartition(id) + 1);
167     totalVertexCount++;
168     // edge counts
169     totalEdgeCount += vertex.getNumEdges();
170     edgeAccumulator.put(id, getEdgesForPartition(id) +
171       vertex.getNumEdges());
172   }
173 
174   /** Getter for MAX edge count to initiate a transfer
175     * @return max edge count per transfer */
176   public long getMaxEdgesPerTransfer() {
177     return maxEdgesPerTransfer;
178   }
179 
180   /** Getter for MAX vertex count to initiate a transfer
181    * @return max edge count per transfer */
182   public long getMaxVerticesPerTransfer() {
183     return maxVerticesPerTransfer;
184   }
185 
186   /** Getter for total edge count for the current InputSplit
187     * @return the # of total edges counted in this InputSplit */
188   public long getTotalEdges() {
189     return totalEdgeCount;
190   }
191 
192   /** Getter for total vetex count for the current InputSplit
193    * @return the total # of vertices counted in this InputSplit */
194   public long getTotalVertices() {
195     return totalVertexCount;
196   }
197 }
198