This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.graph.VertexEdgeCount;
25  import org.apache.giraph.io.GiraphInputFormat;
26  import org.apache.giraph.io.MappingInputFormat;
27  import org.apache.giraph.io.MappingReader;
28  import org.apache.giraph.mapping.MappingEntry;
29  import org.apache.giraph.mapping.MappingStore;
30  import org.apache.giraph.io.InputType;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.hadoop.io.WritableComparable;
33  import org.apache.hadoop.mapreduce.InputSplit;
34  import org.apache.hadoop.mapreduce.Mapper;
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  @SuppressWarnings("unchecked")
47  public class MappingInputSplitsCallable<I extends WritableComparable,
48    V extends Writable, E extends Writable, B extends Writable>
49    extends InputSplitsCallable<I, V, E> {
50    
51    private final MappingInputFormat<I, V, E, B> mappingInputFormat;
52    
53    private final BspServiceWorker<I, V, E> bspServiceWorker;
54  
55    
56  
57  
58  
59  
60  
61  
62  
63  
64    public MappingInputSplitsCallable(
65        MappingInputFormat<I, V, E, B> mappingInputFormat,
66        Mapper<?, ?, ?, ?>.Context context,
67        ImmutableClassesGiraphConfiguration<I, V, E> configuration,
68        BspServiceWorker<I, V, E> bspServiceWorker,
69        WorkerInputSplitsHandler splitsHandler) {
70      super(context, configuration, bspServiceWorker, splitsHandler);
71      this.mappingInputFormat = mappingInputFormat;
72      this.bspServiceWorker = bspServiceWorker;
73    }
74  
75    @Override
76    public GiraphInputFormat getInputFormat() {
77      return mappingInputFormat;
78    }
79  
80    @Override
81    public InputType getInputType() {
82      return InputType.MAPPING;
83    }
84  
85    @Override
86    protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
87      throws IOException, InterruptedException {
88      MappingReader<I, V, E, B> mappingReader =
89          mappingInputFormat.createMappingReader(inputSplit, context);
90      mappingReader.setConf(configuration);
91  
92      WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker
93          .getAggregatorHandler().newThreadAggregatorUsage();
94  
95      mappingReader.initialize(inputSplit, context);
96      mappingReader.setWorkerGlobalCommUsage(globalCommUsage);
97  
98      int entriesLoaded = 0;
99      MappingStore<I, B> mappingStore =
100       (MappingStore<I, B>) bspServiceWorker.getLocalData().getMappingStore();
101 
102     while (mappingReader.nextEntry()) {
103       MappingEntry<I, B> entry = mappingReader.getCurrentEntry();
104       entriesLoaded += 1;
105       mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
106     }
107     return new VertexEdgeCount(0, 0, entriesLoaded);
108   }
109 }