This project has retired. For details please refer to its
Attic page.
SimpleMigrationMasterBlockFactory xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.examples.block_app;
19
20 import java.io.IOException;
21
22 import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
23 import org.apache.giraph.block_app.framework.block.Block;
24 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullBasicComputation;
25 import org.apache.giraph.block_app.migration.MigrationFullBlockFactory;
26 import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
27 import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
28 import org.apache.giraph.conf.GiraphConfiguration;
29 import org.apache.giraph.graph.Vertex;
30 import org.apache.hadoop.io.DoubleWritable;
31 import org.apache.hadoop.io.FloatWritable;
32 import org.apache.hadoop.io.LongWritable;
33 import org.apache.log4j.Logger;
34
35
36
37
38
39 public class SimpleMigrationMasterBlockFactory
40 extends MigrationFullBlockFactory {
41 @Override
42 public Block createBlock(GiraphConfiguration conf) {
43 return createMigrationAppBlock(
44 SimpleMigrationMasterComputation.class,
45 new SimpleMigrationMasterCompute(),
46 DoubleWritable.class,
47 null,
48 conf);
49 }
50
51 @Override
52 protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
53 return LongWritable.class;
54 }
55
56 @Override
57 protected Class<DoubleWritable> getVertexValueClass(
58 GiraphConfiguration conf) {
59 return DoubleWritable.class;
60 }
61
62 @Override
63 protected Class<FloatWritable> getEdgeValueClass(GiraphConfiguration conf) {
64 return FloatWritable.class;
65 }
66
67 @Override
68 protected
69 Class<SimpleMigrationMasterWorkerContext> getWorkerContextValueClass(
70 GiraphConfiguration conf) {
71 return SimpleMigrationMasterWorkerContext.class;
72 }
73
74
75
76
77
78
79
80
81 public static class SimpleMigrationMasterComputation
82 extends MigrationFullBasicComputation<LongWritable, DoubleWritable,
83 FloatWritable, DoubleWritable> {
84
85 public static final String SMC_AGG = "simplemastercompute.aggregator";
86
87
88 private static final Logger LOG =
89 Logger.getLogger(SimpleMigrationMasterComputation.class);
90
91 @Override
92 public void compute(
93 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
94 Iterable<DoubleWritable> messages) throws IOException {
95 double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
96 double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
97 double newSum = oldSum + newValue;
98 vertex.setValue(new DoubleWritable(newSum));
99 SimpleMigrationMasterWorkerContext workerContext = getWorkerContext();
100 workerContext.setFinalSum(newSum);
101 LOG.info("Current sum: " + newSum);
102 }
103 }
104
105
106
107
108 public static class SimpleMigrationMasterWorkerContext
109 extends MigrationFullWorkerContext {
110
111 private static double FINAL_SUM;
112
113 @Override
114 public void preApplication()
115 throws InstantiationException, IllegalAccessException {
116 }
117
118 @Override
119 public void preSuperstep() {
120 }
121
122 @Override
123 public void postSuperstep() {
124 }
125
126 @Override
127 public void postApplication() {
128 }
129
130 public static void setFinalSum(double sum) {
131 FINAL_SUM = sum;
132 }
133
134 public static double getFinalSum() {
135 return FINAL_SUM;
136 }
137 }
138
139
140
141
142 public static class SimpleMigrationMasterCompute
143 extends MigrationFullMasterCompute {
144 @Override
145 public void compute() {
146 setAggregatedValue(SimpleMigrationMasterComputation.SMC_AGG,
147 new DoubleWritable(((double) getSuperstep()) / 2 + 1));
148 if (getSuperstep() == 10) {
149 haltComputation();
150 }
151 }
152
153 @Override
154 public void initialize() throws InstantiationException,
155 IllegalAccessException {
156 registerAggregator(SimpleMigrationMasterComputation.SMC_AGG,
157 DoubleOverwriteAggregator.class);
158 }
159 }
160 }