This project has retired. For details please refer to its Attic page.
MappingInputSplitsCallable xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.graph.VertexEdgeCount;
25  import org.apache.giraph.io.GiraphInputFormat;
26  import org.apache.giraph.io.MappingInputFormat;
27  import org.apache.giraph.io.MappingReader;
28  import org.apache.giraph.mapping.MappingEntry;
29  import org.apache.giraph.mapping.MappingStore;
30  import org.apache.giraph.io.InputType;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.hadoop.io.WritableComparable;
33  import org.apache.hadoop.mapreduce.InputSplit;
34  import org.apache.hadoop.mapreduce.Mapper;
35  
36  /**
37   * Load as many mapping input splits as possible.
38   * Every thread will has its own instance of WorkerClientRequestProcessor
39   * to send requests.
40   *
41   * @param <I> vertexId type
42   * @param <V> vertexValue type
43   * @param <E> edgeValue type
44   * @param <B> mappingTarget type
45   */
46  @SuppressWarnings("unchecked")
47  public class MappingInputSplitsCallable<I extends WritableComparable,
48    V extends Writable, E extends Writable, B extends Writable>
49    extends InputSplitsCallable<I, V, E> {
50    /** User supplied mappingInputFormat */
51    private final MappingInputFormat<I, V, E, B> mappingInputFormat;
52    /** Link to bspServiceWorker */
53    private final BspServiceWorker<I, V, E> bspServiceWorker;
54  
55    /**
56     * Constructor
57     *
58     * @param mappingInputFormat mappingInputFormat
59     * @param context Context
60     * @param configuration Configuration
61     * @param bspServiceWorker bsp service worker
62     * @param splitsHandler Splits handler
63     */
64    public MappingInputSplitsCallable(
65        MappingInputFormat<I, V, E, B> mappingInputFormat,
66        Mapper<?, ?, ?, ?>.Context context,
67        ImmutableClassesGiraphConfiguration<I, V, E> configuration,
68        BspServiceWorker<I, V, E> bspServiceWorker,
69        WorkerInputSplitsHandler splitsHandler) {
70      super(context, configuration, bspServiceWorker, splitsHandler);
71      this.mappingInputFormat = mappingInputFormat;
72      this.bspServiceWorker = bspServiceWorker;
73    }
74  
75    @Override
76    public GiraphInputFormat getInputFormat() {
77      return mappingInputFormat;
78    }
79  
80    @Override
81    public InputType getInputType() {
82      return InputType.MAPPING;
83    }
84  
85    @Override
86    protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
87      throws IOException, InterruptedException {
88      MappingReader<I, V, E, B> mappingReader =
89          mappingInputFormat.createMappingReader(inputSplit, context);
90      mappingReader.setConf(configuration);
91  
92      WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker
93          .getAggregatorHandler().newThreadAggregatorUsage();
94  
95      mappingReader.initialize(inputSplit, context);
96      mappingReader.setWorkerGlobalCommUsage(globalCommUsage);
97  
98      int entriesLoaded = 0;
99      MappingStore<I, B> mappingStore =
100       (MappingStore<I, B>) bspServiceWorker.getLocalData().getMappingStore();
101 
102     while (mappingReader.nextEntry()) {
103       MappingEntry<I, B> entry = mappingReader.getCurrentEntry();
104       entriesLoaded += 1;
105       mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
106     }
107     return new VertexEdgeCount(0, 0, entriesLoaded);
108   }
109 }