This project has retired. For details please refer to its
Attic page.
MappingInputSplitsCallable xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.worker;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.graph.VertexEdgeCount;
25 import org.apache.giraph.io.GiraphInputFormat;
26 import org.apache.giraph.io.MappingInputFormat;
27 import org.apache.giraph.io.MappingReader;
28 import org.apache.giraph.mapping.MappingEntry;
29 import org.apache.giraph.mapping.MappingStore;
30 import org.apache.giraph.io.InputType;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.io.WritableComparable;
33 import org.apache.hadoop.mapreduce.InputSplit;
34 import org.apache.hadoop.mapreduce.Mapper;
35
36
37
38
39
40
41
42
43
44
45
46 @SuppressWarnings("unchecked")
47 public class MappingInputSplitsCallable<I extends WritableComparable,
48 V extends Writable, E extends Writable, B extends Writable>
49 extends InputSplitsCallable<I, V, E> {
50
51 private final MappingInputFormat<I, V, E, B> mappingInputFormat;
52
53 private final BspServiceWorker<I, V, E> bspServiceWorker;
54
55
56
57
58
59
60
61
62
63
64 public MappingInputSplitsCallable(
65 MappingInputFormat<I, V, E, B> mappingInputFormat,
66 Mapper<?, ?, ?, ?>.Context context,
67 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
68 BspServiceWorker<I, V, E> bspServiceWorker,
69 WorkerInputSplitsHandler splitsHandler) {
70 super(context, configuration, bspServiceWorker, splitsHandler);
71 this.mappingInputFormat = mappingInputFormat;
72 this.bspServiceWorker = bspServiceWorker;
73 }
74
75 @Override
76 public GiraphInputFormat getInputFormat() {
77 return mappingInputFormat;
78 }
79
80 @Override
81 public InputType getInputType() {
82 return InputType.MAPPING;
83 }
84
85 @Override
86 protected VertexEdgeCount readInputSplit(InputSplit inputSplit)
87 throws IOException, InterruptedException {
88 MappingReader<I, V, E, B> mappingReader =
89 mappingInputFormat.createMappingReader(inputSplit, context);
90 mappingReader.setConf(configuration);
91
92 WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker
93 .getAggregatorHandler().newThreadAggregatorUsage();
94
95 mappingReader.initialize(inputSplit, context);
96 mappingReader.setWorkerGlobalCommUsage(globalCommUsage);
97
98 int entriesLoaded = 0;
99 MappingStore<I, B> mappingStore =
100 (MappingStore<I, B>) bspServiceWorker.getLocalData().getMappingStore();
101
102 while (mappingReader.nextEntry()) {
103 MappingEntry<I, B> entry = mappingReader.getCurrentEntry();
104 entriesLoaded += 1;
105 mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
106 }
107 return new VertexEdgeCount(0, 0, entriesLoaded);
108 }
109 }