View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master.input;
20  
21  import org.apache.giraph.comm.MasterClient;
22  import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
23  import org.apache.giraph.conf.StrConfOption;
24  import org.apache.giraph.io.GiraphInputFormat;
25  import org.apache.giraph.io.InputType;
26  import org.apache.giraph.worker.WorkerInfo;
27  import org.apache.hadoop.mapreduce.Counter;
28  import org.apache.hadoop.mapreduce.InputSplit;
29  import org.apache.hadoop.mapreduce.Mapper;
30  
31  import java.io.ByteArrayOutputStream;
32  import java.io.DataOutput;
33  import java.io.DataOutputStream;
34  import java.io.IOException;
35  import java.util.ArrayList;
36  import java.util.EnumMap;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.concurrent.CountDownLatch;
40  import java.util.concurrent.atomic.AtomicInteger;
41  
42  /**
43   * Handler for input splits on master
44   *
45   * Since currently Giraph fails if worker fails while reading input, we
46   * didn't complicate this part with retries yet, later it could be added by
47   * keeping track of which worker got which split and then if worker dies put
48   * these splits back to queues.
49   */
50  public class MasterInputSplitsHandler {
51    /**
52     * Store in counters timestamps when we finished reading
53     * these fractions of input
54     */
55    public static final StrConfOption DONE_FRACTIONS_TO_STORE_IN_COUNTERS =
56        new StrConfOption("giraph.master.input.doneFractionsToStoreInCounters",
57            "0.99,1", "Store in counters timestamps when we finished reading " +
58            "these fractions of input");
59  
60    /** Whether to use locality information */
61    private final boolean useLocality;
62    /** Master client */
63    private MasterClient masterClient;
64    /** Master client */
65    private List<WorkerInfo> workers;
66    /** Map of splits organizers for each split type */
67    private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
68        new EnumMap<>(InputType.class);
69    /** Latches to say when one input splits type is ready to be accessed */
70    private Map<InputType, CountDownLatch> latchesMap =
71        new EnumMap<>(InputType.class);
72    /** Context for accessing counters */
73    private final Mapper.Context context;
74    /** How many splits per type are there total */
75    private final Map<InputType, Integer> numSplitsPerType =
76        new EnumMap<>(InputType.class);
77    /** How many splits per type have been read so far */
78    private final Map<InputType, AtomicInteger> numSplitsReadPerType =
79        new EnumMap<>(InputType.class);
80    /** Timestamps when various splits were created */
81    private final Map<InputType, Long> splitsCreatedTimestamp =
82        new EnumMap<>(InputType.class);
83    /**
84     * Store in counters timestamps when we finished reading
85     * these fractions of input
86     */
87    private final double[] doneFractionsToStoreInCounters;
88  
89    /**
90     * Constructor
91     *
92     * @param useLocality Whether to use locality information or not
93     * @param context Context for accessing counters
94     */
95    public MasterInputSplitsHandler(boolean useLocality, Mapper.Context context) {
96      this.useLocality = useLocality;
97      this.context = context;
98      for (InputType inputType : InputType.values()) {
99        latchesMap.put(inputType, new CountDownLatch(1));
100       numSplitsReadPerType.put(inputType, new AtomicInteger(0));
101     }
102 
103     String[] tmp = DONE_FRACTIONS_TO_STORE_IN_COUNTERS.get(
104         context.getConfiguration()).split(",");
105     doneFractionsToStoreInCounters = new double[tmp.length];
106     for (int i = 0; i < tmp.length; i++) {
107       doneFractionsToStoreInCounters[i] = Double.parseDouble(tmp[i].trim());
108     }
109   }
110 
111   /**
112    * Initialize
113    *
114    * @param masterClient Master client
115    * @param workers List of workers
116    */
117   public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
118     this.masterClient = masterClient;
119     this.workers = workers;
120   }
121 
122   /**
123    * Add splits
124    *
125    * @param splitsType Type of splits
126    * @param inputSplits Splits
127    * @param inputFormat Format
128    */
129   public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
130       GiraphInputFormat inputFormat) {
131     splitsCreatedTimestamp.put(splitsType, System.currentTimeMillis());
132     List<byte[]> serializedSplits = new ArrayList<>();
133     for (InputSplit inputSplit : inputSplits) {
134       try {
135         ByteArrayOutputStream byteArrayOutputStream =
136             new ByteArrayOutputStream();
137         DataOutput outputStream =
138             new DataOutputStream(byteArrayOutputStream);
139         inputFormat.writeInputSplit(inputSplit, outputStream);
140         serializedSplits.add(byteArrayOutputStream.toByteArray());
141       } catch (IOException e) {
142         throw new IllegalStateException("IOException occurred", e);
143       }
144     }
145     InputSplitsMasterOrganizer inputSplitsOrganizer;
146     if (splitsType == InputType.MAPPING) {
147       inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
148           serializedSplits, workers);
149     } else {
150       inputSplitsOrganizer = useLocality ?
151           new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
152               inputSplits, workers) :
153           new BasicInputSplitsMasterOrganizer(serializedSplits);
154     }
155     splitsMap.put(splitsType, inputSplitsOrganizer);
156     latchesMap.get(splitsType).countDown();
157     numSplitsPerType.put(splitsType, serializedSplits.size());
158   }
159 
160   /**
161    * Called after we receive a split request from some worker, should send
162    * split back to it if available, or send it information that there is no
163    * more available
164    *
165    * @param splitType Type of split requested
166    * @param workerTaskId Id of worker who requested split
167    * @param isFirstSplit Whether this is the first split a thread is requesting,
168    *   or this request indicates that previously requested input split was done
169    */
170   public void sendSplitTo(InputType splitType, int workerTaskId,
171       boolean isFirstSplit) {
172     try {
173       // Make sure we don't try to retrieve splits before they were added
174       latchesMap.get(splitType).await();
175     } catch (InterruptedException e) {
176       throw new IllegalStateException("Interrupted", e);
177     }
178     byte[] serializedInputSplit =
179         splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
180     masterClient.sendWritableRequest(workerTaskId,
181         new ReplyWithInputSplitRequest(splitType,
182             serializedInputSplit == null ? new byte[0] : serializedInputSplit));
183     if (!isFirstSplit) {
184       incrementSplitsRead(splitType);
185     }
186   }
187 
188   /**
189    * Increment splits read
190    *
191    * @param splitType Type of split which was read
192    */
193   private void incrementSplitsRead(InputType splitType) {
194     int splitsRead = numSplitsReadPerType.get(splitType).incrementAndGet();
195     int splits = numSplitsPerType.get(splitType);
196     for (int i = 0; i < doneFractionsToStoreInCounters.length; i++) {
197       if (splitsRead == (int) (splits * doneFractionsToStoreInCounters[i])) {
198         splitFractionReached(
199             splitType, doneFractionsToStoreInCounters[i], context);
200       }
201     }
202   }
203 
204   /**
205    * Call when we reached some fraction of split type done to set the
206    * timestamp counter
207    *
208    * @param inputType Type of input
209    * @param fraction Which fraction of input type was done reading
210    * @param context Context for accessing counters
211    */
212   private void splitFractionReached(
213       InputType inputType, double fraction, Mapper.Context context) {
214     getSplitFractionDoneTimestampCounter(inputType, fraction, context).setValue(
215         System.currentTimeMillis() - splitsCreatedTimestamp.get(inputType));
216   }
217 
218   /**
219    * Get counter
220    *
221    * @param inputType Type of input for counter
222    * @param fraction Fraction for counter
223    * @param context Context to get counter from
224    * @return Counter
225    */
226   public static Counter getSplitFractionDoneTimestampCounter(
227       InputType inputType, double fraction, Mapper.Context context) {
228     return context.getCounter(inputType.name() + " input",
229         String.format("%.2f%% done time (ms)", fraction * 100));
230   }
231 }