This project has retired. For details please refer to its
Attic page.
SendWorkerVerticesRequest xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.comm.ServerData;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.utils.ExtendedDataOutput;
24 import org.apache.giraph.utils.PairList;
25 import org.apache.giraph.utils.WritableUtils;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.log4j.Logger;
29
30 import java.io.DataInput;
31 import java.io.DataOutput;
32 import java.io.IOException;
33
34
35
36
37
38
39
40
41 @SuppressWarnings("rawtypes")
42 public class SendWorkerVerticesRequest<I extends WritableComparable,
43 V extends Writable, E extends Writable> extends
44 WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
45
46 private static final Logger LOG =
47 Logger.getLogger(SendWorkerVerticesRequest.class);
48
49 private PairList<Integer, ExtendedDataOutput> workerPartitions;
50
51
52
53
54 public SendWorkerVerticesRequest() { }
55
56
57
58
59
60
61
62 public SendWorkerVerticesRequest(
63 ImmutableClassesGiraphConfiguration<I, V, E> conf,
64 PairList<Integer, ExtendedDataOutput> workerPartitions) {
65 this.workerPartitions = workerPartitions;
66 setConf(conf);
67 }
68
69 @Override
70 public void readFieldsRequest(DataInput input) throws IOException {
71 int numPartitions = input.readInt();
72 workerPartitions = new PairList<Integer, ExtendedDataOutput>();
73 workerPartitions.initialize(numPartitions);
74 while (numPartitions-- > 0) {
75 final int partitionId = input.readInt();
76 ExtendedDataOutput partitionData =
77 WritableUtils.readExtendedDataOutput(input, getConf());
78 workerPartitions.add(partitionId, partitionData);
79 }
80 }
81
82 @Override
83 public void writeRequest(DataOutput output) throws IOException {
84 output.writeInt(workerPartitions.getSize());
85 PairList<Integer, ExtendedDataOutput>.Iterator
86 iterator = workerPartitions.getIterator();
87 while (iterator.hasNext()) {
88 iterator.next();
89 output.writeInt(iterator.getCurrentFirst());
90 WritableUtils.writeExtendedDataOutput(
91 iterator.getCurrentSecond(), output);
92 }
93 }
94
95 @Override
96 public RequestType getType() {
97 return RequestType.SEND_WORKER_VERTICES_REQUEST;
98 }
99
100 @Override
101 public void doRequest(ServerData<I, V, E> serverData) {
102 PairList<Integer, ExtendedDataOutput>.Iterator
103 iterator = workerPartitions.getIterator();
104 while (iterator.hasNext()) {
105 iterator.next();
106 serverData.getPartitionStore()
107 .addPartitionVertices(iterator.getCurrentFirst(),
108 iterator.getCurrentSecond());
109 }
110 }
111
112 @Override
113 public int getSerializedSize() {
114
115 int size = super.getSerializedSize() + 4;
116 PairList<Integer, ExtendedDataOutput>.Iterator iterator =
117 workerPartitions.getIterator();
118 while (iterator.hasNext()) {
119 iterator.next();
120
121 size += 8 + iterator.getCurrentSecond().getPos();
122 }
123 return size;
124 }
125 }
126