This project has retired. For details please refer to its
        
        Attic page.
      
 
AddressesAndPartitionsWritable xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.graph;
20  
21  import org.apache.giraph.partition.PartitionOwner;
22  import org.apache.giraph.master.MasterInfo;
23  import org.apache.giraph.utils.ReflectionUtils;
24  import org.apache.giraph.utils.WritableUtils;
25  import org.apache.giraph.worker.WorkerInfo;
26  import org.apache.hadoop.io.Writable;
27  
28  import com.google.common.collect.Iterables;
29  import com.google.common.collect.Lists;
30  import com.google.common.collect.Maps;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.IOException;
35  import java.util.Collection;
36  import java.util.List;
37  import java.util.Map;
38  
39  
40  
41  
42  public class AddressesAndPartitionsWritable implements Writable {
43    
44    private MasterInfo masterInfo;
45    
46    private List<WorkerInfo> workerInfos;
47    
48    private Collection<PartitionOwner> partitionOwners;
49  
50    
51  
52  
53  
54  
55  
56  
57    public AddressesAndPartitionsWritable(MasterInfo masterInfo,
58        List<WorkerInfo> workerInfos,
59        Collection<PartitionOwner> partitionOwners) {
60      this.masterInfo = masterInfo;
61      this.workerInfos = workerInfos;
62      this.partitionOwners = partitionOwners;
63    }
64  
65    
66    public AddressesAndPartitionsWritable() {
67    }
68  
69    
70  
71  
72  
73  
74    public MasterInfo getMasterInfo() {
75      return masterInfo;
76    }
77  
78    
79  
80  
81  
82  
83    public List<WorkerInfo> getWorkerInfos() {
84      return workerInfos;
85    }
86  
87    
88  
89  
90  
91  
92    public Collection<PartitionOwner> getPartitionOwners() {
93      return partitionOwners;
94    }
95  
96    @Override
97    public void write(DataOutput output) throws IOException {
98      masterInfo.write(output);
99  
100     output.writeInt(workerInfos.size());
101     for (WorkerInfo workerInfo : workerInfos) {
102       workerInfo.write(output);
103     }
104 
105     Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
106     
107     
108     List<WorkerInfo> previousWorkerInfos = Lists.newArrayList();
109     for (PartitionOwner partitionOwner : partitionOwners) {
110       if (partitionOwner.getPreviousWorkerInfo() != null) {
111         if (!workerInfoMap.containsKey(
112             partitionOwner.getPreviousWorkerInfo().getTaskId())) {
113           previousWorkerInfos.add(partitionOwner.getPreviousWorkerInfo());
114         }
115       }
116     }
117     output.writeInt(previousWorkerInfos.size());
118     for (WorkerInfo workerInfo : previousWorkerInfos) {
119       workerInfo.write(output);
120     }
121 
122     output.writeInt(partitionOwners.size());
123     if (partitionOwners.size() > 0) {
124       WritableUtils.writeClass(
125           partitionOwners.iterator().next().getClass(), output);
126     }
127     for (PartitionOwner partitionOwner : partitionOwners) {
128       partitionOwner.writeWithWorkerIds(output);
129     }
130   }
131 
132   @Override
133   public void readFields(DataInput input) throws IOException {
134     masterInfo = new MasterInfo();
135     masterInfo.readFields(input);
136 
137     int workerInfosSize = input.readInt();
138     workerInfos = Lists.newArrayListWithCapacity(workerInfosSize);
139     for (int i = 0; i < workerInfosSize; i++) {
140       WorkerInfo workerInfo = new WorkerInfo();
141       workerInfo.readFields(input);
142       workerInfos.add(workerInfo);
143     }
144 
145     Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
146     int additionalWorkerInfos = input.readInt();
147     for (int i = 0; i < additionalWorkerInfos; i++) {
148       WorkerInfo workerInfo = new WorkerInfo();
149       workerInfo.readFields(input);
150       workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
151     }
152 
153     int partitionOwnersSize = input.readInt();
154     Class<PartitionOwner> partitionOwnerClass = null;
155     if (partitionOwnersSize > 0) {
156       partitionOwnerClass = WritableUtils.readClass(input);
157     }
158     partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
159     for (int i = 0; i < partitionOwnersSize; i++) {
160       PartitionOwner partitionOwner =
161           ReflectionUtils.newInstance(partitionOwnerClass);
162       partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap);
163       partitionOwners.add(partitionOwner);
164     }
165   }
166 
167   
168 
169 
170 
171 
172 
173   private static Map<Integer, WorkerInfo> getAsWorkerInfoMap(
174       Iterable<WorkerInfo> workerInfos) {
175     Map<Integer, WorkerInfo> workerInfoMap =
176         Maps.newHashMapWithExpectedSize(Iterables.size(workerInfos));
177     for (WorkerInfo workerInfo : workerInfos) {
178       workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
179     }
180     return workerInfoMap;
181   }
182 }