View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.internal;
19  
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Objects;
23  
24  import org.apache.giraph.block_app.framework.api.BlockApiHandle;
25  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
26  import org.apache.giraph.conf.DefaultMessageClasses;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.conf.MessageClasses;
30  import org.apache.giraph.factories.DefaultMessageValueFactory;
31  import org.apache.giraph.master.MasterCompute;
32  import org.apache.giraph.types.NoMessage;
33  import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
34  import org.apache.giraph.utils.WritableUtils;
35  import org.apache.giraph.worker.WorkerGlobalCommUsage;
36  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
37  import org.apache.hadoop.io.IntWritable;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.log4j.Logger;
40  
41  /**
42   * Pair of pieces to be executed on workers in a superstep
43   *
44   * @param <S> Execution stage type
45   */
46  @SuppressWarnings({ "rawtypes", "unchecked" })
47  public class BlockWorkerPieces<S> {
48    private static final Logger LOG = Logger.getLogger(BlockWorkerPieces.class);
49  
50    /** Aggregator holding next worker computation */
51    private static final
52    String NEXT_WORKER_PIECES = "giraph.blocks.next_worker_pieces";
53  
54    private final PairedPieceAndStage<S> receiver;
55    private final PairedPieceAndStage<S> sender;
56    private final BlockApiHandle blockApiHandle;
57  
58    public BlockWorkerPieces(
59        PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender,
60        BlockApiHandle blockApiHandle) {
61      this.receiver = receiver;
62      this.sender = sender;
63      this.blockApiHandle = blockApiHandle;
64    }
65  
66    public PairedPieceAndStage<S> getReceiver() {
67      return receiver;
68    }
69  
70    public PairedPieceAndStage<S> getSender() {
71      return sender;
72    }
73  
74    public BlockApiHandle getBlockApiHandle() {
75      return blockApiHandle;
76    }
77  
78    public MessageClasses getOutgoingMessageClasses(
79        ImmutableClassesGiraphConfiguration conf) {
80      MessageClasses messageClasses;
81      if (sender == null || sender.getPiece() == null) {
82        messageClasses = new DefaultMessageClasses(
83            NoMessage.class,
84            DefaultMessageValueFactory.class,
85            null,
86            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION);
87      } else {
88        messageClasses = sender.getPiece().getMessageClasses(conf);
89      }
90  
91      messageClasses.verifyConsistent(conf);
92      return messageClasses;
93    }
94  
95    @Override
96    public String toString() {
97      return "[receiver=" + receiver + ",sender=" + sender + "]";
98    }
99  
100   public String toStringShort() {
101     String receiverString =
102         Objects.toString(receiver != null ? receiver.getPiece() : null);
103     String senderString =
104         Objects.toString(sender != null ? sender.getPiece() : null);
105     if (receiverString.equals(senderString)) {
106       return "[receiver&sender=" + receiverString + "]";
107     } else {
108       return "[receiver=" + receiverString + ",sender=" + senderString + "]";
109     }
110   }
111 
112   /**
113    * Sets which WorkerComputation is going to be executed in the next superstep.
114    */
115   public static <S> void setNextWorkerPieces(
116       MasterCompute master, BlockWorkerPieces<S> nextWorkerPieces) {
117     Writable toBroadcast = new KryoWritableWrapper<>(nextWorkerPieces);
118     byte[] data = WritableUtils.toByteArrayUnsafe(toBroadcast);
119 
120     // TODO: extract splitting logic into common utility
121     int overhead = 4096;
122     int singleSize = Math.max(
123         overhead,
124         GiraphConstants.MAX_MSG_REQUEST_SIZE.get(master.getConf()) - overhead);
125 
126     ArrayList<byte[]> splittedData = new ArrayList<>();
127     if (data.length < singleSize) {
128       splittedData.add(data);
129     } else {
130       for (int start = 0; start < data.length; start += singleSize) {
131         splittedData.add(Arrays.copyOfRange(
132             data, start, Math.min(data.length, start + singleSize)));
133       }
134     }
135 
136     LOG.info("Next worker piece - total serialized size: " + data.length +
137         ", split into " + splittedData.size());
138     master.getContext().getCounter(
139         "PassedWorker Stats", "total serialized size")
140         .increment(data.length);
141     master.getContext().getCounter(
142         "PassedWorker Stats", "split parts")
143         .increment(splittedData.size());
144 
145     master.broadcast(NEXT_WORKER_PIECES, new IntWritable(splittedData.size()));
146 
147     for (int i = 0; i < splittedData.size(); i++) {
148       master.broadcast(NEXT_WORKER_PIECES + "_part_" + i,
149           KryoWritableWrapper.wrapIfNeeded(splittedData.get(i)));
150     }
151 
152     master.setOutgoingMessageClasses(
153         nextWorkerPieces.getOutgoingMessageClasses(master.getConf()));
154   }
155 
156   public static <S> BlockWorkerPieces<S> getNextWorkerPieces(
157       WorkerGlobalCommUsage worker) {
158     int splits = worker.<IntWritable>getBroadcast(NEXT_WORKER_PIECES).get();
159 
160     int totalLength = 0;
161     ArrayList<byte[]> splittedData = new ArrayList<>();
162     for (int i = 0; i < splits; i++) {
163       byte[] cur = KryoWritableWrapper.<byte[]>unwrapIfNeeded(
164           worker.getBroadcast(NEXT_WORKER_PIECES + "_part_" + i));
165       splittedData.add(cur);
166       totalLength += cur.length;
167     }
168 
169     byte[] merged;
170     if (splits == 1) {
171       merged = splittedData.get(0);
172     } else {
173       merged = new byte[totalLength];
174       int index = 0;
175       for (int i = 0; i < splits; i++) {
176         System.arraycopy(
177             splittedData.get(i), 0, merged, index, splittedData.get(i).length);
178         index += splittedData.get(i).length;
179       }
180     }
181 
182     KryoWritableWrapper<BlockWorkerPieces<S>> wrapper =
183         new KryoWritableWrapper<>();
184     WritableUtils.fromByteArrayUnsafe(
185         merged, wrapper, new UnsafeReusableByteArrayInput());
186     return wrapper.get();
187   }
188 }