This project has retired. For details please refer to its Attic page.
InMemoryEdgeStoreFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.edge;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.edge.primitives.IntEdgeStore;
24  import org.apache.giraph.edge.primitives.LongEdgeStore;
25  import org.apache.hadoop.io.IntWritable;
26  import org.apache.hadoop.io.LongWritable;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.util.Progressable;
30  
31  /**
32   * Edge store factory which produces message stores which hold all
33   * edges in memory. It creates primitive edges stores when vertex id is
34   * IntWritable or LongWritable
35   *
36   * @param <I> Vertex id
37   * @param <V> Vertex value
38   * @param <E> Edge value
39   */
40  @SuppressWarnings("unchecked")
41  public class InMemoryEdgeStoreFactory<I extends WritableComparable,
42    V extends Writable, E extends Writable>
43    implements EdgeStoreFactory<I, V, E> {
44    /** Service worker. */
45    protected CentralizedServiceWorker<I, V, E> service;
46    /** Giraph configuration. */
47    protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
48    /** Progressable to report progress. */
49    protected Progressable progressable;
50  
51    @Override
52    public EdgeStore<I, V, E> newStore() {
53      Class<I> vertexIdClass = conf.getVertexIdClass();
54      EdgeStore<I, V, E> edgeStore;
55      if (vertexIdClass.equals(IntWritable.class)) {
56        edgeStore = (EdgeStore<I, V, E>) new IntEdgeStore<>(
57            (CentralizedServiceWorker<IntWritable, V, E>) service,
58            (ImmutableClassesGiraphConfiguration<IntWritable, V, E>) conf,
59            progressable);
60      } else if (vertexIdClass.equals(LongWritable.class)) {
61        edgeStore = (EdgeStore<I, V, E>) new LongEdgeStore<>(
62            (CentralizedServiceWorker<LongWritable, V, E>) service,
63            (ImmutableClassesGiraphConfiguration<LongWritable, V, E>) conf,
64            progressable);
65      } else {
66        edgeStore = new SimpleEdgeStore<>(service, conf, progressable);
67      }
68      return edgeStore;
69    }
70  
71    @Override
72    public void initialize(CentralizedServiceWorker<I, V, E> service,
73      ImmutableClassesGiraphConfiguration<I, V, E> conf,
74      Progressable progressable) {
75      this.service = service;
76      this.conf = conf;
77      this.progressable = progressable;
78    }
79  }