This project has retired. For details please refer to its Attic page.
SendWorkerOneMessageToManyRequest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.requests;
20  
21  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.util.Map.Entry;
27  
28  import org.apache.giraph.bsp.CentralizedServiceWorker;
29  import org.apache.giraph.comm.ServerData;
30  import org.apache.giraph.comm.messages.MessageStore;
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.partition.PartitionOwner;
33  import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
34  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
35  import org.apache.giraph.utils.VertexIdMessageIterator;
36  import org.apache.hadoop.io.Writable;
37  import org.apache.hadoop.io.WritableComparable;
38  
39  /**
40   * Send a collection of ByteArrayOneMessageToManyIds messages to a worker.
41   *
42   * @param <I> Vertex id
43   * @param <M> Message data
44   */
45  @SuppressWarnings("unchecked")
46  public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
47      M extends Writable> extends WritableRequest<I, Writable, Writable>
48      implements WorkerRequest<I, Writable, Writable> {
49    /** ByteArrayOneMessageToManyIds encoding of vertexId &amp; messages */
50    protected ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds;
51  
52    /**
53     * Constructor used for reflection only.
54     */
55    public SendWorkerOneMessageToManyRequest() { }
56  
57    /**
58     * Constructor used to send request.
59     *
60     * @param oneMessageToManyIds ByteArrayOneMessageToManyIds
61     * @param conf ImmutableClassesGiraphConfiguration
62     */
63    public SendWorkerOneMessageToManyRequest(
64        ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds,
65        ImmutableClassesGiraphConfiguration conf) {
66      this.oneMessageToManyIds = oneMessageToManyIds;
67      setConf(conf);
68    }
69  
70    @Override
71    public RequestType getType() {
72      return RequestType.SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST;
73    }
74  
75    @Override
76    public void readFieldsRequest(DataInput input) throws IOException {
77      oneMessageToManyIds = new ByteArrayOneMessageToManyIds<>(
78          getConf().<M>createOutgoingMessageValueFactory());
79      oneMessageToManyIds.setConf(getConf());
80      oneMessageToManyIds.readFields(input);
81    }
82  
83    @Override
84    public void writeRequest(DataOutput output) throws IOException {
85      this.oneMessageToManyIds.write(output);
86    }
87  
88    @Override
89    public int getSerializedSize() {
90      return super.getSerializedSize() +
91          this.oneMessageToManyIds.getSerializedSize();
92    }
93  
94    @Override
95    public void doRequest(ServerData serverData) {
96      MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
97      if (messageStore.isPointerListEncoding()) {
98        // if message store is pointer list based then send data as is
99        messageStore.addPartitionMessages(-1, oneMessageToManyIds);
100     } else { // else split the data per partition and send individually
101       CentralizedServiceWorker<I, ?, ?> serviceWorker =
102           serverData.getServiceWorker();
103       // Get the initial size of ByteArrayVertexIdMessages per partition
104       // on this worker. To make sure every ByteArrayVertexIdMessages to have
105       // enough space to store the messages, we divide the original
106       // ByteArrayOneMessageToManyIds message size by the number of partitions
107       // and double the size
108       // (Assume the major component in ByteArrayOneMessageToManyIds message
109       // is a id list. Now each target id has a copy of message,
110       // therefore we double the buffer size)
111       // to get the initial size of ByteArrayVertexIdMessages.
112       int initialSize = oneMessageToManyIds.getSize() /
113           serverData.getPartitionStore().getNumPartitions() * 2;
114       // Create ByteArrayVertexIdMessages for
115       // message reformatting.
116       Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs =
117           new Int2ObjectOpenHashMap<>();
118 
119       // Put data from ByteArrayOneMessageToManyIds
120       // to ByteArrayVertexIdMessages
121       VertexIdMessageIterator<I, M> vertexIdMessageIterator =
122         oneMessageToManyIds.getVertexIdMessageIterator();
123       while (vertexIdMessageIterator.hasNext()) {
124         vertexIdMessageIterator.next();
125         M msg = vertexIdMessageIterator.getCurrentMessage();
126         I vertexId = vertexIdMessageIterator.getCurrentVertexId();
127         PartitionOwner owner =
128             serviceWorker.getVertexPartitionOwner(vertexId);
129         int partitionId = owner.getPartitionId();
130         ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs
131             .get(partitionId);
132         if (idMsgs == null) {
133           idMsgs = new ByteArrayVertexIdMessages<>(
134               getConf().<M>createOutgoingMessageValueFactory());
135           idMsgs.setConf(getConf());
136           idMsgs.initialize(initialSize);
137           partitionIdMsgs.put(partitionId, idMsgs);
138         }
139         idMsgs.add(vertexId, msg);
140       }
141 
142       // Read ByteArrayVertexIdMessages and write to message store
143       for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
144           partitionIdMsgs.entrySet()) {
145         if (!idMsgs.getValue().isEmpty()) {
146           serverData.getIncomingMessageStore().addPartitionMessages(
147               idMsgs.getKey(), idMsgs.getValue());
148         }
149       }
150     }
151   }
152 }