This project has retired. For details please refer to its Attic page.
SimpleEdgeStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.edge;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.utils.VertexIdEdgeIterator;
24  import org.apache.hadoop.io.Writable;
25  import org.apache.hadoop.io.WritableComparable;
26  import org.apache.hadoop.util.Progressable;
27  
28  import com.google.common.collect.MapMaker;
29  
30  import java.io.DataInput;
31  import java.io.DataOutput;
32  import java.io.IOException;
33  import java.util.Iterator;
34  import java.util.Map;
35  import java.util.concurrent.ConcurrentMap;
36  
37  /**
38   * Simple in memory edge store which supports any type of ids.
39   *
40   * @param <I> Vertex id
41   * @param <V> Vertex value
42   * @param <E> Edge value
43   */
44  public class SimpleEdgeStore<I extends WritableComparable,
45    V extends Writable, E extends Writable>
46    extends AbstractEdgeStore<I, V, E, I,
47    Map.Entry<I, OutEdges<I, E>>> {
48  
49    /**
50     * Constructor.
51     *
52     * @param service Service worker
53     * @param configuration Configuration
54     * @param progressable Progressable
55     */
56    public SimpleEdgeStore(
57      CentralizedServiceWorker<I, V, E> service,
58      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
59      Progressable progressable) {
60      super(service, configuration, progressable);
61    }
62  
63    @Override
64    protected I getVertexId(Map.Entry<I, OutEdges<I, E>> entry,
65      I representativeVertexId) {
66      return entry.getKey();
67    }
68  
69    @Override
70    protected I createVertexId(Map.Entry<I, OutEdges<I, E>> entry) {
71      return entry.getKey();
72    }
73  
74    @Override
75    protected ConcurrentMap<I, OutEdges<I, E>> getPartitionEdges(
76      int partitionId) {
77      ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
78          (ConcurrentMap<I, OutEdges<I, E>>) transientEdges.get(partitionId);
79      if (partitionEdges == null) {
80        ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges =
81            new MapMaker().concurrencyLevel(
82                configuration.getNettyServerExecutionConcurrency()).makeMap();
83        partitionEdges = (ConcurrentMap<I, OutEdges<I, E>>)
84            transientEdges.putIfAbsent(partitionId, newPartitionEdges);
85        if (partitionEdges == null) {
86          partitionEdges = newPartitionEdges;
87        }
88      }
89      return partitionEdges;
90    }
91  
92    @Override
93    protected OutEdges<I, E> getPartitionEdges(
94      Map.Entry<I, OutEdges<I, E>> entry) {
95      return entry.getValue();
96    }
97  
98    @Override
99    protected void writeVertexKey(I key, DataOutput output) throws IOException {
100     key.write(output);
101   }
102 
103   @Override
104   protected I readVertexKey(DataInput input) throws IOException {
105     I id = configuration.createVertexId();
106     id.readFields(input);
107     return id;
108   }
109 
110   @Override
111   protected Iterator<Map.Entry<I, OutEdges<I, E>>>
112   getPartitionEdgesIterator(Map<I, OutEdges<I, E>> partitionEdges) {
113     return partitionEdges.entrySet().iterator();
114   }
115 
116   @Override
117   protected OutEdges<I, E> getVertexOutEdges(
118       VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
119       Map<I, OutEdges<I, E>> partitionEdgesIn) {
120     ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
121         (ConcurrentMap<I, OutEdges<I, E>>) partitionEdgesIn;
122     I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
123     OutEdges<I, E> outEdges = partitionEdges.get(vertexId);
124     if (outEdges == null) {
125       OutEdges<I, E> newOutEdges =
126           configuration.createAndInitializeInputOutEdges();
127       outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges);
128       if (outEdges == null) {
129         outEdges = newOutEdges;
130         // Since we had to use the vertex id as a new key in the map,
131         // we need to release the object.
132         vertexIdEdgeIterator.releaseCurrentVertexId();
133       }
134     }
135     return outEdges;
136   }
137 }