This project has retired. For details please refer to its Attic page.
AbstractEdgeStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.edge;
20  
21  import com.google.common.collect.MapMaker;
22  import org.apache.giraph.bsp.CentralizedServiceWorker;
23  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
24  import org.apache.giraph.conf.GiraphConstants;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.graph.Vertex;
27  import org.apache.giraph.ooc.OutOfCoreEngine;
28  import org.apache.giraph.partition.Partition;
29  import org.apache.giraph.utils.CallableFactory;
30  import org.apache.giraph.utils.ProgressCounter;
31  import org.apache.giraph.utils.ProgressableUtils;
32  import org.apache.giraph.utils.ThreadLocalProgressCounter;
33  import org.apache.giraph.utils.Trimmable;
34  import org.apache.giraph.utils.VertexIdEdgeIterator;
35  import org.apache.giraph.utils.VertexIdEdges;
36  import org.apache.hadoop.io.Writable;
37  import org.apache.hadoop.io.WritableComparable;
38  import org.apache.hadoop.util.Progressable;
39  import org.apache.log4j.Logger;
40  
41  import java.io.DataInput;
42  import java.io.DataOutput;
43  import java.io.IOException;
44  import java.util.Iterator;
45  import java.util.Map;
46  import java.util.concurrent.Callable;
47  import java.util.concurrent.ConcurrentMap;
48  
49  import static com.google.common.base.Preconditions.checkState;
50  
51  /**
52   * Basic implementation of edges store, extended this to easily define simple
53   * and primitive edge stores
54   *
55   * @param <I> Vertex id
56   * @param <V> Vertex value
57   * @param <E> Edge value
58   * @param <K> Key corresponding to Vertex id
59   * @param <Et> Entry type
60   */
61  public abstract class AbstractEdgeStore<I extends WritableComparable,
62    V extends Writable, E extends Writable, K, Et>
63    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
64    implements EdgeStore<I, V, E> {
65    /** Used to keep track of progress during the move-edges process */
66    public static final ThreadLocalProgressCounter PROGRESS_COUNTER =
67      new ThreadLocalProgressCounter();
68    /** Class logger */
69    private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
70    /** Service worker. */
71    protected CentralizedServiceWorker<I, V, E> service;
72    /** Giraph configuration. */
73    protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
74    /** Progressable to report progress. */
75    protected Progressable progressable;
76    /** Map used to temporarily store incoming edges. */
77    protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
78    /**
79     * Whether the chosen {@link OutEdges} implementation allows for Edge
80     * reuse.
81     */
82    protected boolean reuseEdgeObjects;
83    /**
84     * Whether the {@link OutEdges} class used during input is different
85     * from the one used during computation.
86     */
87    protected boolean useInputOutEdges;
88    /** Whether we spilled edges on disk */
89    private volatile boolean hasEdgesOnDisk = false;
90    /** Create source vertices */
91    private CreateSourceVertexCallback<I> createSourceVertexCallback;
92  
93  
94    /**
95     * Constructor.
96     *
97     * @param service Service worker
98     * @param configuration Configuration
99     * @param progressable Progressable
100    */
101   public AbstractEdgeStore(
102     CentralizedServiceWorker<I, V, E> service,
103     ImmutableClassesGiraphConfiguration<I, V, E> configuration,
104     Progressable progressable) {
105     this.service = service;
106     this.configuration = configuration;
107     this.progressable = progressable;
108     transientEdges = new MapMaker().concurrencyLevel(
109       configuration.getNettyServerExecutionConcurrency()).makeMap();
110     reuseEdgeObjects = configuration.reuseEdgeObjects();
111     useInputOutEdges = configuration.useInputOutEdges();
112     createSourceVertexCallback =
113         GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
114             .newInstance(configuration);
115   }
116 
117   /**
118    * Get vertexId for a given key
119    *
120    * @param entry for vertexId key
121    * @param representativeVertexId representativeVertexId
122    * @return vertex Id
123    */
124   protected abstract I getVertexId(Et entry, I representativeVertexId);
125 
126   /**
127    * Create vertexId from a given key
128    *
129    * @param entry for vertexId key
130    * @return new vertexId
131    */
132   protected abstract I createVertexId(Et entry);
133 
134   /**
135    * Get OutEdges for a given partition
136    *
137    * @param partitionId id of partition
138    * @return OutEdges for the partition
139    */
140   protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId);
141 
142   /**
143    * Return the OutEdges for a given partition
144    *
145    * @param entry for vertexId key
146    * @return out edges
147    */
148   protected abstract OutEdges<I, E> getPartitionEdges(Et entry);
149 
150   /**
151    * Writes the given key to the output
152    *
153    * @param key input key to be written
154    * @param output output to write the key to
155    */
156   protected abstract void writeVertexKey(K key, DataOutput output)
157   throws IOException;
158 
159   /**
160    * Reads the given key from the input
161    *
162    * @param input input to read the key from
163    * @return Key read from the input
164    */
165   protected abstract K readVertexKey(DataInput input) throws IOException;
166 
167   /**
168    * Get iterator for partition edges
169    *
170    * @param partitionEdges map of out-edges for vertices in a partition
171    * @return iterator
172    */
173   protected abstract Iterator<Et>
174   getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
175 
176   @Override
177   public boolean hasEdgesForPartition(int partitionId) {
178     return transientEdges.containsKey(partitionId);
179   }
180 
181   @Override
182   public void writePartitionEdgeStore(int partitionId, DataOutput output)
183       throws IOException {
184     Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
185     if (edges != null) {
186       output.writeInt(edges.size());
187       if (edges.size() > 0) {
188         hasEdgesOnDisk = true;
189       }
190       for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
191         writeVertexKey(edge.getKey(), output);
192         edge.getValue().write(output);
193       }
194     }
195   }
196 
197   @Override
198   public void readPartitionEdgeStore(int partitionId, DataInput input)
199       throws IOException {
200     checkState(!transientEdges.containsKey(partitionId),
201         "readPartitionEdgeStore: reading a partition that is already there in" +
202             " the partition store (impossible)");
203     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
204     int numEntries = input.readInt();
205     for (int i = 0; i < numEntries; ++i) {
206       K vertexKey = readVertexKey(input);
207       OutEdges<I, E> edges = configuration.createAndInitializeInputOutEdges();
208       edges.readFields(input);
209       partitionEdges.put(vertexKey, edges);
210     }
211   }
212 
213   /**
214    * Get out-edges for a given vertex
215    *
216    * @param vertexIdEdgeIterator vertex Id Edge iterator
217    * @param partitionEdgesIn map of out-edges for vertices in a partition
218    * @return out-edges for the vertex
219    */
220   protected abstract OutEdges<I, E> getVertexOutEdges(
221     VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
222     Map<K, OutEdges<I, E>> partitionEdgesIn);
223 
224   @Override
225   public void addPartitionEdges(
226     int partitionId, VertexIdEdges<I, E> edges) {
227     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
228 
229     VertexIdEdgeIterator<I, E> vertexIdEdgeIterator =
230         edges.getVertexIdEdgeIterator();
231     while (vertexIdEdgeIterator.hasNext()) {
232       vertexIdEdgeIterator.next();
233       Edge<I, E> edge = reuseEdgeObjects ?
234           vertexIdEdgeIterator.getCurrentEdge() :
235           vertexIdEdgeIterator.releaseCurrentEdge();
236       OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator,
237           partitionEdges);
238       synchronized (outEdges) {
239         outEdges.add(edge);
240       }
241     }
242   }
243 
244   /**
245    * Convert the input edges to the {@link OutEdges} data structure used
246    * for computation (if different).
247    *
248    * @param inputEdges Input edges
249    * @return Compute edges
250    */
251   private OutEdges<I, E> convertInputToComputeEdges(
252     OutEdges<I, E> inputEdges) {
253     if (!useInputOutEdges) {
254       return inputEdges;
255     } else {
256       return configuration.createAndInitializeOutEdges(inputEdges);
257     }
258   }
259 
260   @Override
261   public void moveEdgesToVertices() {
262     if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
263       if (LOG.isInfoEnabled()) {
264         LOG.info("moveEdgesToVertices: No edges to move");
265       }
266       return;
267     }
268 
269     if (LOG.isInfoEnabled()) {
270       LOG.info("moveEdgesToVertices: Moving incoming edges to " +
271           "vertices. Using " + createSourceVertexCallback);
272     }
273 
274     service.getPartitionStore().startIteration();
275     int numThreads = configuration.getNumInputSplitsThreads();
276 
277     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
278       @Override
279       public Callable<Void> newCallable(int callableId) {
280         return new Callable<Void>() {
281           @Override
282           public Void call() throws Exception {
283             I representativeVertexId = configuration.createVertexId();
284             OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
285             if (oocEngine != null) {
286               oocEngine.processingThreadStart();
287             }
288             ProgressCounter numVerticesProcessed = PROGRESS_COUNTER.get();
289             while (true) {
290               Partition<I, V, E> partition =
291                   service.getPartitionStore().getNextPartition();
292               if (partition == null) {
293                 break;
294               }
295               Map<K, OutEdges<I, E>> partitionEdges =
296                   transientEdges.remove(partition.getId());
297               if (partitionEdges == null) {
298                 service.getPartitionStore().putPartition(partition);
299                 continue;
300               }
301 
302               Iterator<Et> iterator =
303                   getPartitionEdgesIterator(partitionEdges);
304               // process all vertices in given partition
305               int count = 0;
306               while (iterator.hasNext()) {
307                 // If out-of-core mechanism is used, check whether this thread
308                 // can stay active or it should temporarily suspend and stop
309                 // processing and generating more data for the moment.
310                 if (oocEngine != null &&
311                     (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
312                   oocEngine.activeThreadCheckIn();
313                 }
314                 Et entry = iterator.next();
315                 I vertexId = getVertexId(entry, representativeVertexId);
316                 OutEdges<I, E> outEdges = convertInputToComputeEdges(
317                   getPartitionEdges(entry));
318                 Vertex<I, V, E> vertex = partition.getVertex(vertexId);
319                 // If the source vertex doesn't exist, create it. Otherwise,
320                 // just set the edges.
321                 if (vertex == null) {
322                   if (createSourceVertexCallback
323                       .shouldCreateSourceVertex(vertexId)) {
324                     // createVertex only if it is allowed by configuration
325                     vertex = configuration.createVertex();
326                     vertex.initialize(createVertexId(entry),
327                         configuration.createVertexValue(), outEdges);
328                     partition.putVertex(vertex);
329                   }
330                 } else {
331                   // A vertex may exist with or without edges initially
332                   // and optimize the case of no initial edges
333                   if (vertex.getNumEdges() == 0) {
334                     vertex.setEdges(outEdges);
335                   } else {
336                     for (Edge<I, E> edge : outEdges) {
337                       vertex.addEdge(edge);
338                     }
339                   }
340                   if (vertex instanceof Trimmable) {
341                     ((Trimmable) vertex).trim();
342                   }
343                   // Some Partition implementations (e.g. ByteArrayPartition)
344                   // require us to put back the vertex after modifying it.
345                   partition.saveVertex(vertex);
346                 }
347                 numVerticesProcessed.inc();
348                 iterator.remove();
349               }
350               // Some PartitionStore implementations
351               // (e.g. DiskBackedPartitionStore) require us to put back the
352               // partition after modifying it.
353               service.getPartitionStore().putPartition(partition);
354             }
355             if (oocEngine != null) {
356               oocEngine.processingThreadFinish();
357             }
358             return null;
359           }
360         };
361       }
362     };
363     ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
364         "move-edges-%d", progressable);
365 
366     // remove all entries
367     transientEdges.clear();
368 
369     if (LOG.isInfoEnabled()) {
370       LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
371           "vertices.");
372     }
373   }
374 }