This project has retired. For details please refer to its
Attic page.
InMemoryMessageStoreFactory xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages;
20
21 import org.apache.giraph.combiner.MessageCombiner;
22 import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
23 import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore;
24 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
25 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
26 import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListPerVertexStore;
27 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
28 import org.apache.giraph.conf.GiraphConstants;
29 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30 import org.apache.giraph.conf.MessageClasses;
31 import org.apache.giraph.factories.MessageValueFactory;
32 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
33 import org.apache.giraph.types.ops.TypeOpsUtils;
34 import org.apache.hadoop.io.DoubleWritable;
35 import org.apache.hadoop.io.FloatWritable;
36 import org.apache.hadoop.io.IntWritable;
37 import org.apache.hadoop.io.LongWritable;
38 import org.apache.hadoop.io.Writable;
39 import org.apache.hadoop.io.WritableComparable;
40 import org.apache.log4j.Logger;
41
42
43
44
45
46
47
48
49
50
51 @SuppressWarnings("unchecked")
52 public class InMemoryMessageStoreFactory<I extends WritableComparable,
53 M extends Writable>
54 implements MessageStoreFactory<I, M, MessageStore<I, M>> {
55
56 private static final Logger LOG =
57 Logger.getLogger(InMemoryMessageStoreFactory.class);
58
59
60 protected PartitionSplitInfo<I> partitionInfo;
61
62 protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
63
64
65
66
67 public InMemoryMessageStoreFactory() {
68 }
69
70
71
72
73
74
75
76
77
78 protected MessageStore<I, M> newStoreWithCombiner(
79 Class<M> messageClass,
80 MessageValueFactory<M> messageValueFactory,
81 MessageCombiner<? super I, M> messageCombiner) {
82 MessageStore messageStore;
83 Class<I> vertexIdClass = conf.getVertexIdClass();
84 if (vertexIdClass.equals(IntWritable.class) &&
85 messageClass.equals(FloatWritable.class)) {
86 messageStore = new IntFloatMessageStore(
87 (PartitionSplitInfo<IntWritable>) partitionInfo,
88 (MessageCombiner<IntWritable, FloatWritable>) messageCombiner);
89 } else if (vertexIdClass.equals(LongWritable.class) &&
90 messageClass.equals(DoubleWritable.class)) {
91 messageStore = new LongDoubleMessageStore(
92 (PartitionSplitInfo<LongWritable>) partitionInfo,
93 (MessageCombiner<LongWritable, DoubleWritable>) messageCombiner);
94 } else {
95 PrimitiveIdTypeOps<I> idTypeOps =
96 TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
97 if (idTypeOps != null) {
98 messageStore = new IdOneMessagePerVertexStore<>(
99 messageValueFactory, partitionInfo, messageCombiner, conf);
100 } else {
101 messageStore = new OneMessagePerVertexStore<I, M>(
102 messageValueFactory, partitionInfo, messageCombiner, conf);
103 }
104 }
105 return messageStore;
106 }
107
108
109
110
111
112
113
114
115
116 protected MessageStore<I, M> newStoreWithoutCombiner(
117 Class<M> messageClass,
118 MessageValueFactory<M> messageValueFactory,
119 MessageEncodeAndStoreType encodeAndStore) {
120 MessageStore messageStore = null;
121 Class<I> vertexIdClass = conf.getVertexIdClass();
122
123 if (vertexIdClass.equals(LongWritable.class) && encodeAndStore.equals(
124 MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
125 messageStore = new LongPointerListPerVertexStore(
126 messageValueFactory, partitionInfo, conf);
127 } else {
128 if (encodeAndStore.equals(
129 MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
130 encodeAndStore.equals(
131 MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
132 PrimitiveIdTypeOps<I> idTypeOps =
133 TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
134 if (idTypeOps != null) {
135 messageStore = new IdByteArrayMessageStore<>(
136 messageValueFactory, partitionInfo, conf);
137 } else {
138 messageStore = new ByteArrayMessagesPerVertexStore<>(
139 messageValueFactory, partitionInfo, conf);
140 }
141 } else if (encodeAndStore.equals(
142 MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
143 messageStore = new PointerListPerVertexStore<>(
144 messageValueFactory, partitionInfo, conf);
145 }
146 }
147 return messageStore;
148 }
149
150 @Override
151 public MessageStore<I, M> newStore(
152 MessageClasses<I, M> messageClasses) {
153 Class<M> messageClass = messageClasses.getMessageClass();
154 MessageValueFactory<M> messageValueFactory =
155 messageClasses.createMessageValueFactory(conf);
156 MessageCombiner<? super I, M> messageCombiner =
157 messageClasses.createMessageCombiner(conf);
158 MessageStore messageStore;
159 if (messageCombiner != null) {
160 messageStore = newStoreWithCombiner(
161 messageClass, messageValueFactory, messageCombiner);
162 } else {
163 messageStore = newStoreWithoutCombiner(
164 messageClass, messageValueFactory,
165 messageClasses.getMessageEncodeAndStoreType());
166 }
167
168 if (LOG.isInfoEnabled()) {
169 LOG.info("newStore: Created " + messageStore.getClass() +
170 " for vertex id " + conf.getVertexIdClass() +
171 " and message value " + messageClass + " and" +
172 (messageCombiner != null ? " message combiner " +
173 messageCombiner.getClass() : " no combiner"));
174 }
175
176 int asyncMessageStoreThreads =
177 GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.get(conf);
178 if (asyncMessageStoreThreads > 0) {
179 messageStore = new AsyncMessageStoreWrapper(
180 messageStore,
181 partitionInfo.getPartitionIds(),
182 asyncMessageStoreThreads);
183 }
184
185 return messageStore;
186 }
187
188 @Override
189 public void initialize(PartitionSplitInfo<I> partitionInfo,
190 ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
191 this.partitionInfo = partitionInfo;
192 this.conf = conf;
193 }
194 }