This project has retired. For details please refer to its Attic page.
InMemoryMessageStoreFactory xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import org.apache.giraph.combiner.MessageCombiner;
22  import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
23  import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore;
24  import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
25  import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
26  import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListPerVertexStore;
27  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
28  import org.apache.giraph.conf.GiraphConstants;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.conf.MessageClasses;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
33  import org.apache.giraph.types.ops.TypeOpsUtils;
34  import org.apache.hadoop.io.DoubleWritable;
35  import org.apache.hadoop.io.FloatWritable;
36  import org.apache.hadoop.io.IntWritable;
37  import org.apache.hadoop.io.LongWritable;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  import org.apache.log4j.Logger;
41  
42  /**
43   * Message store factory which produces message stores which hold all
44   * messages in memory. Depending on whether or not combiner is currently used,
45   * this factory creates {@link OneMessagePerVertexStore} or
46   * {@link ByteArrayMessagesPerVertexStore}
47   *
48   * @param <I> Vertex id
49   * @param <M> Message data
50   */
51  @SuppressWarnings("unchecked")
52  public class InMemoryMessageStoreFactory<I extends WritableComparable,
53      M extends Writable>
54      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
55    /** Class logger */
56    private static final Logger LOG =
57        Logger.getLogger(InMemoryMessageStoreFactory.class);
58  
59    /** Partition info */
60    protected PartitionSplitInfo<I> partitionInfo;
61    /** Hadoop configuration */
62    protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
63  
64    /**
65     * Default constructor allowing class invocation via Reflection.
66     */
67    public InMemoryMessageStoreFactory() {
68    }
69  
70    /**
71     * MessageStore to be used when combiner is enabled
72     *
73     * @param messageClass message class
74     * @param messageValueFactory message value factory
75     * @param messageCombiner message combiner
76     * @return message store
77     */
78    protected MessageStore<I, M> newStoreWithCombiner(
79        Class<M> messageClass,
80        MessageValueFactory<M> messageValueFactory,
81        MessageCombiner<? super I, M> messageCombiner) {
82      MessageStore messageStore;
83      Class<I> vertexIdClass = conf.getVertexIdClass();
84      if (vertexIdClass.equals(IntWritable.class) &&
85          messageClass.equals(FloatWritable.class)) {
86        messageStore = new IntFloatMessageStore(
87            (PartitionSplitInfo<IntWritable>) partitionInfo,
88            (MessageCombiner<IntWritable, FloatWritable>) messageCombiner);
89      } else if (vertexIdClass.equals(LongWritable.class) &&
90          messageClass.equals(DoubleWritable.class)) {
91        messageStore = new LongDoubleMessageStore(
92            (PartitionSplitInfo<LongWritable>) partitionInfo,
93            (MessageCombiner<LongWritable, DoubleWritable>) messageCombiner);
94      } else {
95        PrimitiveIdTypeOps<I> idTypeOps =
96            TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
97        if (idTypeOps != null) {
98          messageStore = new IdOneMessagePerVertexStore<>(
99            messageValueFactory, partitionInfo, messageCombiner, conf);
100       } else {
101         messageStore = new OneMessagePerVertexStore<I, M>(
102           messageValueFactory, partitionInfo, messageCombiner, conf);
103       }
104     }
105     return messageStore;
106   }
107 
108   /**
109    * MessageStore to be used when combiner is not enabled
110    *
111    * @param messageClass message class
112    * @param messageValueFactory message value factory
113    * @param encodeAndStore message encode and store type
114    * @return message store
115    */
116   protected MessageStore<I, M> newStoreWithoutCombiner(
117       Class<M> messageClass,
118       MessageValueFactory<M> messageValueFactory,
119       MessageEncodeAndStoreType encodeAndStore) {
120     MessageStore messageStore = null;
121     Class<I> vertexIdClass = conf.getVertexIdClass();
122     // a special case for LongWritable with POINTER_LIST_PER_VERTEX
123     if (vertexIdClass.equals(LongWritable.class) && encodeAndStore.equals(
124         MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
125       messageStore = new LongPointerListPerVertexStore(
126         messageValueFactory, partitionInfo, conf);
127     } else { // GENERAL
128       if (encodeAndStore.equals(
129           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
130           encodeAndStore.equals(
131               MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
132         PrimitiveIdTypeOps<I> idTypeOps =
133             TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
134         if (idTypeOps != null) {
135           messageStore = new IdByteArrayMessageStore<>(
136               messageValueFactory, partitionInfo, conf);
137         } else {
138           messageStore = new ByteArrayMessagesPerVertexStore<>(
139               messageValueFactory, partitionInfo, conf);
140         }
141       } else if (encodeAndStore.equals(
142           MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
143         messageStore = new PointerListPerVertexStore<>(
144           messageValueFactory, partitionInfo, conf);
145       }
146     }
147     return messageStore;
148   }
149 
150   @Override
151   public MessageStore<I, M> newStore(
152       MessageClasses<I, M> messageClasses) {
153     Class<M> messageClass = messageClasses.getMessageClass();
154     MessageValueFactory<M> messageValueFactory =
155         messageClasses.createMessageValueFactory(conf);
156     MessageCombiner<? super I, M> messageCombiner =
157         messageClasses.createMessageCombiner(conf);
158     MessageStore messageStore;
159     if (messageCombiner != null) {
160       messageStore = newStoreWithCombiner(
161           messageClass, messageValueFactory, messageCombiner);
162     } else {
163       messageStore = newStoreWithoutCombiner(
164           messageClass, messageValueFactory,
165           messageClasses.getMessageEncodeAndStoreType());
166     }
167 
168     if (LOG.isInfoEnabled()) {
169       LOG.info("newStore: Created " + messageStore.getClass() +
170           " for vertex id " + conf.getVertexIdClass() +
171           " and message value " + messageClass + " and" +
172           (messageCombiner != null ? " message combiner " +
173               messageCombiner.getClass() : " no combiner"));
174     }
175 
176     int asyncMessageStoreThreads =
177         GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.get(conf);
178     if (asyncMessageStoreThreads > 0) {
179       messageStore = new AsyncMessageStoreWrapper(
180           messageStore,
181           partitionInfo.getPartitionIds(),
182           asyncMessageStoreThreads);
183     }
184 
185     return messageStore;
186   }
187 
188   @Override
189   public void initialize(PartitionSplitInfo<I> partitionInfo,
190       ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
191     this.partitionInfo = partitionInfo;
192     this.conf = conf;
193   }
194 }