View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.edge;
20  
21  import com.google.common.collect.MapMaker;
22  import org.apache.giraph.bsp.CentralizedServiceWorker;
23  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.ooc.OutOfCoreEngine;
27  import org.apache.giraph.partition.Partition;
28  import org.apache.giraph.utils.CallableFactory;
29  import org.apache.giraph.utils.ProgressableUtils;
30  import org.apache.giraph.utils.Trimmable;
31  import org.apache.giraph.utils.VertexIdEdgeIterator;
32  import org.apache.giraph.utils.VertexIdEdges;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  import org.apache.hadoop.util.Progressable;
36  import org.apache.log4j.Logger;
37  
38  import java.io.DataInput;
39  import java.io.DataOutput;
40  import java.io.IOException;
41  import java.util.Iterator;
42  import java.util.Map;
43  import java.util.concurrent.Callable;
44  import java.util.concurrent.ConcurrentMap;
45  
46  import static com.google.common.base.Preconditions.checkState;
47  
48  /**
49   * Basic implementation of edges store, extended this to easily define simple
50   * and primitive edge stores
51   *
52   * @param <I> Vertex id
53   * @param <V> Vertex value
54   * @param <E> Edge value
55   * @param <K> Key corresponding to Vertex id
56   * @param <Et> Entry type
57   */
58  public abstract class AbstractEdgeStore<I extends WritableComparable,
59    V extends Writable, E extends Writable, K, Et>
60    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
61    implements EdgeStore<I, V, E> {
62    /** Class logger */
63    private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
64    /** Service worker. */
65    protected CentralizedServiceWorker<I, V, E> service;
66    /** Giraph configuration. */
67    protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
68    /** Progressable to report progress. */
69    protected Progressable progressable;
70    /** Map used to temporarily store incoming edges. */
71    protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
72    /**
73     * Whether the chosen {@link OutEdges} implementation allows for Edge
74     * reuse.
75     */
76    protected boolean reuseEdgeObjects;
77    /**
78     * Whether the {@link OutEdges} class used during input is different
79     * from the one used during computation.
80     */
81    protected boolean useInputOutEdges;
82    /** Whether we spilled edges on disk */
83    private boolean hasEdgesOnDisk = false;
84  
85    /**
86     * Constructor.
87     *
88     * @param service Service worker
89     * @param configuration Configuration
90     * @param progressable Progressable
91     */
92    public AbstractEdgeStore(
93      CentralizedServiceWorker<I, V, E> service,
94      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
95      Progressable progressable) {
96      this.service = service;
97      this.configuration = configuration;
98      this.progressable = progressable;
99      transientEdges = new MapMaker().concurrencyLevel(
100       configuration.getNettyServerExecutionConcurrency()).makeMap();
101     reuseEdgeObjects = configuration.reuseEdgeObjects();
102     useInputOutEdges = configuration.useInputOutEdges();
103   }
104 
105   /**
106    * Get vertexId for a given key
107    *
108    * @param entry for vertexId key
109    * @param representativeVertexId representativeVertexId
110    * @return vertex Id
111    */
112   protected abstract I getVertexId(Et entry, I representativeVertexId);
113 
114   /**
115    * Create vertexId from a given key
116    *
117    * @param entry for vertexId key
118    * @return new vertexId
119    */
120   protected abstract I createVertexId(Et entry);
121 
122   /**
123    * Get OutEdges for a given partition
124    *
125    * @param partitionId id of partition
126    * @return OutEdges for the partition
127    */
128   protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId);
129 
130   /**
131    * Return the OutEdges for a given partition
132    *
133    * @param entry for vertexId key
134    * @return out edges
135    */
136   protected abstract OutEdges<I, E> getPartitionEdges(Et entry);
137 
138   /**
139    * Writes the given key to the output
140    *
141    * @param key input key to be written
142    * @param output output to write the key to
143    */
144   protected abstract void writeVertexKey(K key, DataOutput output)
145   throws IOException;
146 
147   /**
148    * Reads the given key from the input
149    *
150    * @param input input to read the key from
151    * @return Key read from the input
152    */
153   protected abstract K readVertexKey(DataInput input) throws IOException;
154 
155   /**
156    * Get iterator for partition edges
157    *
158    * @param partitionEdges map of out-edges for vertices in a partition
159    * @return iterator
160    */
161   protected abstract Iterator<Et>
162   getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
163 
164   @Override
165   public boolean hasEdgesForPartition(int partitionId) {
166     return transientEdges.containsKey(partitionId);
167   }
168 
169   @Override
170   public void writePartitionEdgeStore(int partitionId, DataOutput output)
171       throws IOException {
172     Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
173     if (edges != null) {
174       output.writeInt(edges.size());
175       if (edges.size() > 0) {
176         hasEdgesOnDisk = true;
177       }
178       for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
179         writeVertexKey(edge.getKey(), output);
180         edge.getValue().write(output);
181       }
182     }
183   }
184 
185   @Override
186   public void readPartitionEdgeStore(int partitionId, DataInput input)
187       throws IOException {
188     checkState(!transientEdges.containsKey(partitionId),
189         "readPartitionEdgeStore: reading a partition that is already there in" +
190             " the partition store (impossible)");
191     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
192     int numEntries = input.readInt();
193     for (int i = 0; i < numEntries; ++i) {
194       K vertexKey = readVertexKey(input);
195       OutEdges<I, E> edges = configuration.createAndInitializeInputOutEdges();
196       edges.readFields(input);
197       partitionEdges.put(vertexKey, edges);
198     }
199   }
200 
201   /**
202    * Get out-edges for a given vertex
203    *
204    * @param vertexIdEdgeIterator vertex Id Edge iterator
205    * @param partitionEdgesIn map of out-edges for vertices in a partition
206    * @return out-edges for the vertex
207    */
208   protected abstract OutEdges<I, E> getVertexOutEdges(
209     VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
210     Map<K, OutEdges<I, E>> partitionEdgesIn);
211 
212   @Override
213   public void addPartitionEdges(
214     int partitionId, VertexIdEdges<I, E> edges) {
215     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
216 
217     VertexIdEdgeIterator<I, E> vertexIdEdgeIterator =
218         edges.getVertexIdEdgeIterator();
219     while (vertexIdEdgeIterator.hasNext()) {
220       vertexIdEdgeIterator.next();
221       Edge<I, E> edge = reuseEdgeObjects ?
222           vertexIdEdgeIterator.getCurrentEdge() :
223           vertexIdEdgeIterator.releaseCurrentEdge();
224       OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator,
225           partitionEdges);
226       synchronized (outEdges) {
227         outEdges.add(edge);
228       }
229     }
230   }
231 
232   /**
233    * Convert the input edges to the {@link OutEdges} data structure used
234    * for computation (if different).
235    *
236    * @param inputEdges Input edges
237    * @return Compute edges
238    */
239   private OutEdges<I, E> convertInputToComputeEdges(
240     OutEdges<I, E> inputEdges) {
241     if (!useInputOutEdges) {
242       return inputEdges;
243     } else {
244       return configuration.createAndInitializeOutEdges(inputEdges);
245     }
246   }
247 
248   @Override
249   public void moveEdgesToVertices() {
250     final boolean createSourceVertex = configuration.getCreateSourceVertex();
251     if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
252       if (LOG.isInfoEnabled()) {
253         LOG.info("moveEdgesToVertices: No edges to move");
254       }
255       return;
256     }
257 
258     if (LOG.isInfoEnabled()) {
259       LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
260     }
261 
262     service.getPartitionStore().startIteration();
263     int numThreads = configuration.getNumInputSplitsThreads();
264 
265     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
266       @Override
267       public Callable<Void> newCallable(int callableId) {
268         return new Callable<Void>() {
269           @Override
270           public Void call() throws Exception {
271             Integer partitionId;
272             I representativeVertexId = configuration.createVertexId();
273             OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
274             if (oocEngine != null) {
275               oocEngine.processingThreadStart();
276             }
277             while (true) {
278               Partition<I, V, E> partition =
279                   service.getPartitionStore().getNextPartition();
280               if (partition == null) {
281                 break;
282               }
283               Map<K, OutEdges<I, E>> partitionEdges =
284                   transientEdges.remove(partition.getId());
285               if (partitionEdges == null) {
286                 service.getPartitionStore().putPartition(partition);
287                 continue;
288               }
289 
290               Iterator<Et> iterator =
291                   getPartitionEdgesIterator(partitionEdges);
292               // process all vertices in given partition
293               int count = 0;
294               while (iterator.hasNext()) {
295                 // If out-of-core mechanism is used, check whether this thread
296                 // can stay active or it should temporarily suspend and stop
297                 // processing and generating more data for the moment.
298                 if (oocEngine != null &&
299                     (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
300                   oocEngine.activeThreadCheckIn();
301                 }
302                 Et entry = iterator.next();
303                 I vertexId = getVertexId(entry, representativeVertexId);
304                 OutEdges<I, E> outEdges = convertInputToComputeEdges(
305                   getPartitionEdges(entry));
306                 Vertex<I, V, E> vertex = partition.getVertex(vertexId);
307                 // If the source vertex doesn't exist, create it. Otherwise,
308                 // just set the edges.
309                 if (vertex == null) {
310                   if (createSourceVertex) {
311                     // createVertex only if it is allowed by configuration
312                     vertex = configuration.createVertex();
313                     vertex.initialize(createVertexId(entry),
314                         configuration.createVertexValue(), outEdges);
315                     partition.putVertex(vertex);
316                   }
317                 } else {
318                   // A vertex may exist with or without edges initially
319                   // and optimize the case of no initial edges
320                   if (vertex.getNumEdges() == 0) {
321                     vertex.setEdges(outEdges);
322                   } else {
323                     for (Edge<I, E> edge : outEdges) {
324                       vertex.addEdge(edge);
325                     }
326                   }
327                   if (vertex instanceof Trimmable) {
328                     ((Trimmable) vertex).trim();
329                   }
330                   // Some Partition implementations (e.g. ByteArrayPartition)
331                   // require us to put back the vertex after modifying it.
332                   partition.saveVertex(vertex);
333                 }
334                 iterator.remove();
335               }
336               // Some PartitionStore implementations
337               // (e.g. DiskBackedPartitionStore) require us to put back the
338               // partition after modifying it.
339               service.getPartitionStore().putPartition(partition);
340             }
341             if (oocEngine != null) {
342               oocEngine.processingThreadFinish();
343             }
344             return null;
345           }
346         };
347       }
348     };
349     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
350         "move-edges-%d", progressable);
351 
352     // remove all entries
353     transientEdges.clear();
354 
355     if (LOG.isInfoEnabled()) {
356       LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
357           "vertices.");
358     }
359   }
360 }