1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.worker;
20
21 import org.apache.giraph.comm.WorkerClient;
22 import org.apache.giraph.comm.requests.AskForInputSplitRequest;
23 import org.apache.giraph.io.InputType;
24
25 import java.util.EnumMap;
26 import java.util.Map;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 /**
31 * Requests splits from master and keeps track of them
32 */
33 public class WorkerInputSplitsHandler {
34 /** Worker info of this worker */
35 private final WorkerInfo workerInfo;
36 /** Task id of master */
37 private final int masterTaskId;
38 /** Worker client, used for communication */
39 private final WorkerClient workerClient;
40 /** Map with currently available splits received from master */
41 private final Map<InputType, BlockingQueue<byte[]>> availableInputSplits;
42
43 /**
44 * Constructor
45 *
46 * @param workerInfo Worker info of this worker
47 * @param masterTaskId Task id of master
48 * @param workerClient Worker client, used for communication
49 */
50 public WorkerInputSplitsHandler(WorkerInfo workerInfo, int masterTaskId,
51 WorkerClient workerClient) {
52 this.workerInfo = workerInfo;
53 this.masterTaskId = masterTaskId;
54 this.workerClient = workerClient;
55 availableInputSplits = new EnumMap<>(InputType.class);
56 for (InputType inputType : InputType.values()) {
57 availableInputSplits.put(
58 inputType, new LinkedBlockingQueue<byte[]>());
59 }
60 }
61
62 /**
63 * Called when an input split has been received from master, adding it to
64 * the map
65 *
66 * @param splitType Type of split
67 * @param serializedInputSplit Split
68 */
69 public void receivedInputSplit(InputType splitType,
70 byte[] serializedInputSplit) {
71 try {
72 availableInputSplits.get(splitType).put(serializedInputSplit);
73 } catch (InterruptedException e) {
74 throw new IllegalStateException("Interrupted", e);
75 }
76 }
77
78 /**
79 * Try to reserve an InputSplit for loading. While InputSplits exists that
80 * are not finished, wait until they are.
81 *
82 * NOTE: iterations on the InputSplit list only halt for each worker when it
83 * has scanned the entire list once and found every split marked RESERVED.
84 * When a worker fails, its Ephemeral RESERVED znodes will disappear,
85 * allowing other iterating workers to claim it's previously read splits.
86 * Only when the last worker left iterating on the list fails can a danger
87 * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
88 * causes job failure, this is OK. As the failure model evolves, this
89 * behavior might need to change. We could add watches on
90 * inputSplitFinishedNodes and stop iterating only when all these nodes
91 * have been created.
92 *
93 * @param splitType Type of split
94 * @param isFirstSplit Whether this is the first split input thread reads
95 * @return reserved InputSplit or null if no unfinished InputSplits exist
96 */
97 public byte[] reserveInputSplit(InputType splitType, boolean isFirstSplit) {
98 // Send request
99 workerClient.sendWritableRequest(masterTaskId,
100 new AskForInputSplitRequest(
101 splitType, workerInfo.getTaskId(), isFirstSplit));
102 try {
103 // Wait for some split to become available
104 byte[] serializedInputSplit = availableInputSplits.get(splitType).take();
105 return serializedInputSplit.length == 0 ? null : serializedInputSplit;
106 } catch (InterruptedException e) {
107 throw new IllegalStateException("Interrupted", e);
108 }
109 }
110 }