View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.combiner.MessageCombiner;
23  import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
24  import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore;
25  import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
26  import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
27  import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
28  import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
29  import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
30  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
31  import org.apache.giraph.conf.GiraphConstants;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.conf.MessageClasses;
34  import org.apache.giraph.factories.MessageValueFactory;
35  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
36  import org.apache.giraph.types.ops.TypeOpsUtils;
37  import org.apache.hadoop.io.DoubleWritable;
38  import org.apache.hadoop.io.FloatWritable;
39  import org.apache.hadoop.io.IntWritable;
40  import org.apache.hadoop.io.LongWritable;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.io.WritableComparable;
43  import org.apache.log4j.Logger;
44  
45  /**
46   * Message store factory which produces message stores which hold all
47   * messages in memory. Depending on whether or not combiner is currently used,
48   * this factory creates {@link OneMessagePerVertexStore} or
49   * {@link ByteArrayMessagesPerVertexStore}
50   *
51   * @param <I> Vertex id
52   * @param <M> Message data
53   */
54  @SuppressWarnings("unchecked")
55  public class InMemoryMessageStoreFactory<I extends WritableComparable,
56      M extends Writable>
57      implements MessageStoreFactory<I, M, MessageStore<I, M>> {
58    /** Class logger */
59    private static final Logger LOG =
60        Logger.getLogger(InMemoryMessageStoreFactory.class);
61  
62    /** Service worker */
63    protected CentralizedServiceWorker<I, ?, ?> service;
64    /** Hadoop configuration */
65    protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
66  
67    /**
68     * Default constructor allowing class invocation via Reflection.
69     */
70    public InMemoryMessageStoreFactory() {
71    }
72  
73    /**
74     * MessageStore to be used when combiner is enabled
75     *
76     * @param messageClass message class
77     * @param messageValueFactory message value factory
78     * @param messageCombiner message combiner
79     * @return message store
80     */
81    protected MessageStore<I, M> newStoreWithCombiner(
82        Class<M> messageClass,
83        MessageValueFactory<M> messageValueFactory,
84        MessageCombiner<? super I, M> messageCombiner) {
85      MessageStore messageStore;
86      Class<I> vertexIdClass = conf.getVertexIdClass();
87      if (vertexIdClass.equals(IntWritable.class) &&
88          messageClass.equals(FloatWritable.class)) {
89        messageStore = new IntFloatMessageStore(
90            (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
91            (MessageCombiner<IntWritable, FloatWritable>) messageCombiner);
92      } else if (vertexIdClass.equals(LongWritable.class) &&
93          messageClass.equals(DoubleWritable.class)) {
94        messageStore = new LongDoubleMessageStore(
95            (CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
96            (MessageCombiner<LongWritable, DoubleWritable>) messageCombiner);
97      } else {
98        PrimitiveIdTypeOps<I> idTypeOps =
99            TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
100       if (idTypeOps != null) {
101         messageStore = new IdOneMessagePerVertexStore<>(
102             messageValueFactory, service, messageCombiner,
103             conf);
104       } else {
105         messageStore =
106             new OneMessagePerVertexStore<I, M>(messageValueFactory, service,
107                 messageCombiner, conf);
108       }
109     }
110     return messageStore;
111   }
112 
113   /**
114    * MessageStore to be used when combiner is not enabled
115    *
116    * @param messageClass message class
117    * @param messageValueFactory message value factory
118    * @param encodeAndStore message encode and store type
119    * @return message store
120    */
121   protected MessageStore<I, M> newStoreWithoutCombiner(
122       Class<M> messageClass,
123       MessageValueFactory<M> messageValueFactory,
124       MessageEncodeAndStoreType encodeAndStore) {
125     MessageStore messageStore = null;
126     Class<I> vertexIdClass = conf.getVertexIdClass();
127     if (vertexIdClass.equals(IntWritable.class)) { // INT
128       messageStore = new IntByteArrayMessageStore(messageValueFactory,
129           service, conf);
130     } else if (vertexIdClass.equals(LongWritable.class)) { // LONG
131       if (encodeAndStore.equals(
132           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
133           encodeAndStore.equals(
134             MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
135         messageStore = new LongByteArrayMessageStore(messageValueFactory,
136             service, conf);
137       } else if (encodeAndStore.equals(
138           MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
139         messageStore = new LongPointerListMessageStore(messageValueFactory,
140             service, conf);
141       }
142     } else { // GENERAL
143       if (encodeAndStore.equals(
144           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
145           encodeAndStore.equals(
146               MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
147         PrimitiveIdTypeOps<I> idTypeOps =
148             TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
149         if (idTypeOps != null) {
150           messageStore = new IdByteArrayMessageStore<>(
151               messageValueFactory, service, conf);
152         } else {
153           messageStore = new ByteArrayMessagesPerVertexStore<>(
154               messageValueFactory, service, conf);
155         }
156       } else if (encodeAndStore.equals(
157           MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
158         messageStore = new PointerListPerVertexStore<>(messageValueFactory,
159             service, conf);
160       }
161     }
162     return messageStore;
163   }
164 
165   @Override
166   public MessageStore<I, M> newStore(
167       MessageClasses<I, M> messageClasses) {
168     Class<M> messageClass = messageClasses.getMessageClass();
169     MessageValueFactory<M> messageValueFactory =
170         messageClasses.createMessageValueFactory(conf);
171     MessageCombiner<? super I, M> messageCombiner =
172         messageClasses.createMessageCombiner(conf);
173     MessageStore messageStore;
174     if (messageCombiner != null) {
175       messageStore = newStoreWithCombiner(
176           messageClass, messageValueFactory, messageCombiner);
177     } else {
178       messageStore = newStoreWithoutCombiner(
179           messageClass, messageValueFactory,
180           messageClasses.getMessageEncodeAndStoreType());
181     }
182 
183     if (LOG.isInfoEnabled()) {
184       LOG.info("newStore: Created " + messageStore.getClass() +
185           " for vertex id " + conf.getVertexIdClass() +
186           " and message value " + messageClass + " and" +
187           (messageCombiner != null ? " message combiner " +
188               messageCombiner.getClass() : " no combiner"));
189     }
190 
191     int asyncMessageStoreThreads =
192         GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.get(conf);
193     if (asyncMessageStoreThreads > 0) {
194       messageStore = new AsyncMessageStoreWrapper(
195           messageStore,
196           service.getPartitionStore().getPartitionIds(),
197           asyncMessageStoreThreads);
198     }
199 
200     return messageStore;
201   }
202 
203   @Override
204   public void initialize(CentralizedServiceWorker<I, ?, ?> service,
205       ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
206     this.service = service;
207     this.conf = conf;
208   }
209 }