View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master.input;
20  
21  import org.apache.giraph.comm.MasterClient;
22  import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
23  import org.apache.giraph.io.GiraphInputFormat;
24  import org.apache.giraph.io.InputType;
25  import org.apache.giraph.worker.WorkerInfo;
26  import org.apache.hadoop.mapreduce.InputSplit;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.DataOutput;
30  import java.io.DataOutputStream;
31  import java.io.IOException;
32  import java.util.ArrayList;
33  import java.util.EnumMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.concurrent.CountDownLatch;
37  
38  /**
39   * Handler for input splits on master
40   *
41   * Since currently Giraph fails if worker fails while reading input, we
42   * didn't complicate this part with retries yet, later it could be added by
43   * keeping track of which worker got which split and then if worker dies put
44   * these splits back to queues.
45   */
46  public class MasterInputSplitsHandler {
47    /** Whether to use locality information */
48    private final boolean useLocality;
49    /** Master client */
50    private MasterClient masterClient;
51    /** Master client */
52    private List<WorkerInfo> workers;
53    /** Map of splits organizers for each split type */
54    private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
55        new EnumMap<>(InputType.class);
56    /** Latches to say when one input splits type is ready to be accessed */
57    private Map<InputType, CountDownLatch> latchesMap =
58        new EnumMap<>(InputType.class);
59  
60    /**
61     * Constructor
62     *
63     * @param useLocality Whether to use locality information or not
64     */
65    public MasterInputSplitsHandler(boolean useLocality) {
66      this.useLocality = useLocality;
67      for (InputType inputType : InputType.values()) {
68        latchesMap.put(inputType, new CountDownLatch(1));
69      }
70    }
71  
72    /**
73     * Initialize
74     *
75     * @param masterClient Master client
76     * @param workers List of workers
77     */
78    public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
79      this.masterClient = masterClient;
80      this.workers = workers;
81    }
82  
83    /**
84     * Add splits
85     *
86     * @param splitsType Type of splits
87     * @param inputSplits Splits
88     * @param inputFormat Format
89     */
90    public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
91        GiraphInputFormat inputFormat) {
92      List<byte[]> serializedSplits = new ArrayList<>();
93      for (InputSplit inputSplit : inputSplits) {
94        try {
95          ByteArrayOutputStream byteArrayOutputStream =
96              new ByteArrayOutputStream();
97          DataOutput outputStream =
98              new DataOutputStream(byteArrayOutputStream);
99          inputFormat.writeInputSplit(inputSplit, outputStream);
100         serializedSplits.add(byteArrayOutputStream.toByteArray());
101       } catch (IOException e) {
102         throw new IllegalStateException("IOException occurred", e);
103       }
104     }
105     InputSplitsMasterOrganizer inputSplitsOrganizer;
106     if (splitsType == InputType.MAPPING) {
107       inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
108           serializedSplits, workers);
109     } else {
110       inputSplitsOrganizer = useLocality ?
111           new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
112               inputSplits, workers) :
113           new BasicInputSplitsMasterOrganizer(serializedSplits);
114     }
115     splitsMap.put(splitsType, inputSplitsOrganizer);
116     latchesMap.get(splitsType).countDown();
117   }
118 
119   /**
120    * Called after we receive a split request from some worker, should send
121    * split back to it if available, or send it information that there is no
122    * more available
123    *
124    * @param splitType Type of split requested
125    * @param workerTaskId Id of worker who requested split
126    */
127   public void sendSplitTo(InputType splitType, int workerTaskId) {
128     try {
129       // Make sure we don't try to retrieve splits before they were added
130       latchesMap.get(splitType).await();
131     } catch (InterruptedException e) {
132       throw new IllegalStateException("Interrupted", e);
133     }
134     byte[] serializedInputSplit =
135         splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
136     masterClient.sendWritableRequest(workerTaskId,
137         new ReplyWithInputSplitRequest(splitType,
138             serializedInputSplit == null ? new byte[0] : serializedInputSplit));
139   }
140 }