This project has retired. For details please refer to its
Attic page.
PointerListPerVertexStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.messages;
20
21 import it.unimi.dsi.fastutil.longs.LongArrayList;
22
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.util.concurrent.ConcurrentMap;
27
28 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29 import org.apache.giraph.factories.MessageValueFactory;
30 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
31 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
32 import org.apache.giraph.utils.ExtendedDataOutput;
33 import org.apache.giraph.utils.VertexIdMessageIterator;
34 import org.apache.giraph.utils.VertexIdMessages;
35 import org.apache.hadoop.io.Writable;
36 import org.apache.hadoop.io.WritableComparable;
37
38
39
40
41
42
43
44
45
46 public class PointerListPerVertexStore<I extends WritableComparable,
47 M extends Writable> extends AbstractListPerVertexStore<I, M, LongArrayList> {
48
49
50 private final ExtendedByteArrayOutputBuffer bytesBuffer;
51
52
53
54
55
56
57
58
59 public PointerListPerVertexStore(
60 MessageValueFactory<M> messageValueFactory,
61 PartitionSplitInfo<I> partitionInfo,
62 ImmutableClassesGiraphConfiguration<I, ?, ?> config
63 ) {
64 super(messageValueFactory, partitionInfo, config);
65 bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
66 }
67
68 @Override
69 public boolean isPointerListEncoding() {
70 return true;
71 }
72
73 @Override
74 protected LongArrayList createList() {
75 return new LongArrayList();
76 }
77
78 @Override
79 public void addPartitionMessages(
80 int partitionId, VertexIdMessages<I, M> messages) {
81 try {
82 VertexIdMessageIterator<I, M> vertexIdMessageIterator =
83 messages.getVertexIdMessageIterator();
84 long pointer = 0;
85 LongArrayList list;
86 while (vertexIdMessageIterator.hasNext()) {
87 vertexIdMessageIterator.next();
88 M msg = vertexIdMessageIterator.getCurrentMessage();
89 list = getOrCreateList(vertexIdMessageIterator);
90 if (vertexIdMessageIterator.isNewMessage()) {
91 IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
92 pointer = indexAndDataOut.getIndex();
93 pointer <<= 32;
94 ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
95 pointer += dataOutput.getPos();
96 msg.write(dataOutput);
97 }
98 synchronized (list) {
99 list.add(pointer);
100 }
101 }
102 } catch (IOException e) {
103 throw new RuntimeException("addPartitionMessages: IOException while" +
104 " adding messages for a partition: " + e);
105 }
106 }
107
108 @Override
109 public void addMessage(I vertexId, M message) throws IOException {
110 LongArrayList list = getOrCreateList(vertexId);
111 IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
112 long pointer = indexAndDataOut.getIndex();
113 pointer <<= 32;
114 ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
115 pointer += dataOutput.getPos();
116 message.write(dataOutput);
117
118 synchronized (list) {
119 list.add(pointer);
120 }
121 }
122
123
124
125
126
127
128
129 @Override
130 public Iterable<M> getMessagesAsIterable(LongArrayList pointers) {
131 return new PointerListMessagesIterable<>(messageValueFactory, pointers,
132 bytesBuffer);
133 }
134
135 @Override
136 protected int getNumberOfMessagesIn(ConcurrentMap<I,
137 LongArrayList> partitionMap) {
138 int numberOfMessages = 0;
139 for (LongArrayList list : partitionMap.values()) {
140 numberOfMessages += list.size();
141 }
142 return numberOfMessages;
143 }
144
145
146 @Override
147 protected void writeMessages(LongArrayList messages, DataOutput out)
148 throws IOException {
149
150 }
151
152 @Override
153 protected LongArrayList readFieldsForMessages(DataInput in)
154 throws IOException {
155 return null;
156 }
157 }