This project has retired. For details please refer to its Attic page.
MappingInputSplitsMasterOrganizer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master.input;
20  
21  import org.apache.giraph.worker.WorkerInfo;
22  
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  /**
29   * Organizer for mapping splits on master. Mapping splits need all to be
30   * given to all workers, unlike vertex and edge splits which are read by
31   * exactly one worker each
32   */
33  public class MappingInputSplitsMasterOrganizer
34      implements InputSplitsMasterOrganizer {
35    /** List of splits */
36    private final List<byte[]> splits;
37    /** Map from worker task id to atomic pointer in splits list */
38    private final Map<Integer, AtomicInteger>
39        workerTaskIdToNextSplitIndexMap;
40  
41    /**
42     * Constructor
43     *
44     * @param serializedSplits Splits
45     * @param workers List of workers
46     */
47    public MappingInputSplitsMasterOrganizer(List<byte[]> serializedSplits,
48        List<WorkerInfo> workers) {
49      this.splits = serializedSplits;
50      workerTaskIdToNextSplitIndexMap = new HashMap<>();
51      for (WorkerInfo worker : workers) {
52        workerTaskIdToNextSplitIndexMap.put(
53            worker.getTaskId(), new AtomicInteger(0));
54      }
55    }
56  
57    @Override
58    public byte[] getSerializedSplitFor(int workerTaskId) {
59      AtomicInteger nextSplitIndex =
60          workerTaskIdToNextSplitIndexMap.get(workerTaskId);
61      int splitIndex = nextSplitIndex.getAndIncrement();
62      return splitIndex < splits.size() ? splits.get(splitIndex) : null;
63    }
64  }