This project has retired. For details please refer to its Attic page.
MasterInputSplitsHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master.input;
20  
21  import org.apache.giraph.comm.MasterClient;
22  import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
23  import org.apache.giraph.conf.StrConfOption;
24  import org.apache.giraph.io.GiraphInputFormat;
25  import org.apache.giraph.io.InputType;
26  import org.apache.giraph.worker.WorkerInfo;
27  import org.apache.hadoop.mapreduce.Counter;
28  import org.apache.hadoop.mapreduce.InputSplit;
29  import org.apache.hadoop.mapreduce.Mapper;
30  
31  import java.io.ByteArrayOutputStream;
32  import java.io.DataOutput;
33  import java.io.DataOutputStream;
34  import java.io.IOException;
35  import java.util.ArrayList;
36  import java.util.EnumMap;
37  import java.util.HashSet;
38  import java.util.List;
39  import java.util.Map;
40  import java.util.Set;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.CountDownLatch;
43  import java.util.concurrent.atomic.AtomicInteger;
44  
45  /**
46   * Handler for input splits on master
47   *
48   * Since currently Giraph fails if worker fails while reading input, we
49   * didn't complicate this part with retries yet, later it could be added by
50   * keeping track of which worker got which split and then if worker dies put
51   * these splits back to queues.
52   */
53  public class MasterInputSplitsHandler {
54    /**
55     * Store in counters timestamps when we finished reading
56     * these fractions of input
57     */
58    public static final StrConfOption DONE_FRACTIONS_TO_STORE_IN_COUNTERS =
59        new StrConfOption("giraph.master.input.doneFractionsToStoreInCounters",
60            "0.99,1", "Store in counters timestamps when we finished reading " +
61            "these fractions of input");
62    /** Map of counter group and names */
63    private static Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
64            new ConcurrentHashMap<>();
65  
66    /** Whether to use locality information */
67    private final boolean useLocality;
68    /** Master client */
69    private MasterClient masterClient;
70    /** Master client */
71    private List<WorkerInfo> workers;
72    /** Map of splits organizers for each split type */
73    private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
74        new EnumMap<>(InputType.class);
75    /** Latches to say when one input splits type is ready to be accessed */
76    private Map<InputType, CountDownLatch> latchesMap =
77        new EnumMap<>(InputType.class);
78    /** Context for accessing counters */
79    private final Mapper.Context context;
80    /** How many splits per type are there total */
81    private final Map<InputType, Integer> numSplitsPerType =
82        new EnumMap<>(InputType.class);
83    /** How many splits per type have been read so far */
84    private final Map<InputType, AtomicInteger> numSplitsReadPerType =
85        new EnumMap<>(InputType.class);
86    /** Timestamps when various splits were created */
87    private final Map<InputType, Long> splitsCreatedTimestamp =
88        new EnumMap<>(InputType.class);
89    /**
90     * Store in counters timestamps when we finished reading
91     * these fractions of input
92     */
93    private final double[] doneFractionsToStoreInCounters;
94  
95    /**
96     * Constructor
97     *
98     * @param useLocality Whether to use locality information or not
99     * @param context Context for accessing counters
100    */
101   public MasterInputSplitsHandler(boolean useLocality, Mapper.Context context) {
102     this.useLocality = useLocality;
103     this.context = context;
104     for (InputType inputType : InputType.values()) {
105       latchesMap.put(inputType, new CountDownLatch(1));
106       numSplitsReadPerType.put(inputType, new AtomicInteger(0));
107     }
108 
109     String[] tmp = DONE_FRACTIONS_TO_STORE_IN_COUNTERS.get(
110         context.getConfiguration()).split(",");
111     doneFractionsToStoreInCounters = new double[tmp.length];
112     for (int i = 0; i < tmp.length; i++) {
113       doneFractionsToStoreInCounters[i] = Double.parseDouble(tmp[i].trim());
114     }
115   }
116 
117   /**
118    * Initialize
119    *
120    * @param masterClient Master client
121    * @param workers List of workers
122    */
123   public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
124     this.masterClient = masterClient;
125     this.workers = workers;
126   }
127 
128   /**
129    * Add splits
130    *
131    * @param splitsType Type of splits
132    * @param inputSplits Splits
133    * @param inputFormat Format
134    */
135   public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
136       GiraphInputFormat inputFormat) {
137     splitsCreatedTimestamp.put(splitsType, System.currentTimeMillis());
138     List<byte[]> serializedSplits = new ArrayList<>();
139     for (InputSplit inputSplit : inputSplits) {
140       try {
141         ByteArrayOutputStream byteArrayOutputStream =
142             new ByteArrayOutputStream();
143         DataOutput outputStream =
144             new DataOutputStream(byteArrayOutputStream);
145         inputFormat.writeInputSplit(inputSplit, outputStream);
146         serializedSplits.add(byteArrayOutputStream.toByteArray());
147       } catch (IOException e) {
148         throw new IllegalStateException("IOException occurred", e);
149       }
150     }
151     InputSplitsMasterOrganizer inputSplitsOrganizer;
152     if (splitsType == InputType.MAPPING) {
153       inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
154           serializedSplits, workers);
155     } else {
156       inputSplitsOrganizer = useLocality ?
157           new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
158               inputSplits, workers) :
159           new BasicInputSplitsMasterOrganizer(serializedSplits);
160     }
161     splitsMap.put(splitsType, inputSplitsOrganizer);
162     latchesMap.get(splitsType).countDown();
163     numSplitsPerType.put(splitsType, serializedSplits.size());
164   }
165 
166   /**
167    * Called after we receive a split request from some worker, should send
168    * split back to it if available, or send it information that there is no
169    * more available
170    *
171    * @param splitType Type of split requested
172    * @param workerTaskId Id of worker who requested split
173    * @param isFirstSplit Whether this is the first split a thread is requesting,
174    *   or this request indicates that previously requested input split was done
175    */
176   public void sendSplitTo(InputType splitType, int workerTaskId,
177       boolean isFirstSplit) {
178     try {
179       // Make sure we don't try to retrieve splits before they were added
180       latchesMap.get(splitType).await();
181     } catch (InterruptedException e) {
182       throw new IllegalStateException("Interrupted", e);
183     }
184     byte[] serializedInputSplit =
185         splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
186     masterClient.sendWritableRequest(workerTaskId,
187         new ReplyWithInputSplitRequest(splitType,
188             serializedInputSplit == null ? new byte[0] : serializedInputSplit));
189     if (!isFirstSplit) {
190       incrementSplitsRead(splitType);
191     }
192   }
193 
194   /**
195    * Increment splits read
196    *
197    * @param splitType Type of split which was read
198    */
199   private void incrementSplitsRead(InputType splitType) {
200     int splitsRead = numSplitsReadPerType.get(splitType).incrementAndGet();
201     int splits = numSplitsPerType.get(splitType);
202     for (int i = 0; i < doneFractionsToStoreInCounters.length; i++) {
203       if (splitsRead == (int) (splits * doneFractionsToStoreInCounters[i])) {
204         splitFractionReached(
205             splitType, doneFractionsToStoreInCounters[i], context);
206       }
207     }
208   }
209 
210   /**
211    * Call when we reached some fraction of split type done to set the
212    * timestamp counter
213    *
214    * @param inputType Type of input
215    * @param fraction Which fraction of input type was done reading
216    * @param context Context for accessing counters
217    */
218   private void splitFractionReached(
219       InputType inputType, double fraction, Mapper.Context context) {
220     getSplitFractionDoneTimestampCounter(inputType, fraction, context).setValue(
221         System.currentTimeMillis() - splitsCreatedTimestamp.get(inputType));
222   }
223 
224   /**
225    * Get counter
226    *
227    * @param inputType Type of input for counter
228    * @param fraction Fraction for counter
229    * @param context Context to get counter from
230    * @return Counter
231    */
232   public static Counter getSplitFractionDoneTimestampCounter(
233       InputType inputType, double fraction, Mapper.Context context) {
234     String groupName = inputType.name() + " input";
235     String counterName = String.format("%.2f%% done time (ms)", fraction * 100);
236     Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
237             groupName, new HashSet<>());
238     counters.add(counterName);
239     COUNTER_GROUP_AND_NAMES.put(groupName, counters);
240     return context.getCounter(groupName, counterName);
241   }
242 
243   public static Map<String, Set<String>> getCounterGroupAndNames() {
244     return COUNTER_GROUP_AND_NAMES;
245   }
246 }