This project has retired. For details please refer to its Attic page.
AddressesAndPartitionsWritable xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import org.apache.giraph.partition.PartitionOwner;
22  import org.apache.giraph.master.MasterInfo;
23  import org.apache.giraph.utils.ReflectionUtils;
24  import org.apache.giraph.utils.WritableUtils;
25  import org.apache.giraph.worker.WorkerInfo;
26  import org.apache.hadoop.io.Writable;
27  
28  import com.google.common.collect.Iterables;
29  import com.google.common.collect.Lists;
30  import com.google.common.collect.Maps;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.IOException;
35  import java.util.Collection;
36  import java.util.List;
37  import java.util.Map;
38  
39  /**
40   * Helper class to write descriptions of master, workers and partition owners
41   */
42  public class AddressesAndPartitionsWritable implements Writable {
43    /** Master information */
44    private MasterInfo masterInfo;
45    /** List of all workers */
46    private List<WorkerInfo> workerInfos;
47    /** Collection of partitions */
48    private Collection<PartitionOwner> partitionOwners;
49  
50    /**
51     * Constructor when we want to serialize object
52     *
53     * @param masterInfo Master information
54     * @param workerInfos List of all workers
55     * @param partitionOwners Collection of partitions
56     */
57    public AddressesAndPartitionsWritable(MasterInfo masterInfo,
58        List<WorkerInfo> workerInfos,
59        Collection<PartitionOwner> partitionOwners) {
60      this.masterInfo = masterInfo;
61      this.workerInfos = workerInfos;
62      this.partitionOwners = partitionOwners;
63    }
64  
65    /** Constructor for reflection */
66    public AddressesAndPartitionsWritable() {
67    }
68  
69    /**
70     * Get master information
71     *
72     * @return Master information
73     */
74    public MasterInfo getMasterInfo() {
75      return masterInfo;
76    }
77  
78    /**
79     * Get all workers
80     *
81     * @return List of all workers
82     */
83    public List<WorkerInfo> getWorkerInfos() {
84      return workerInfos;
85    }
86  
87    /**
88     * Get partition owners
89     *
90     * @return Collection of partition owners
91     */
92    public Collection<PartitionOwner> getPartitionOwners() {
93      return partitionOwners;
94    }
95  
96    @Override
97    public void write(DataOutput output) throws IOException {
98      masterInfo.write(output);
99  
100     output.writeInt(workerInfos.size());
101     for (WorkerInfo workerInfo : workerInfos) {
102       workerInfo.write(output);
103     }
104 
105     Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
106     // Also write out the previous worker information that are used
107     // in the partition owners
108     List<WorkerInfo> previousWorkerInfos = Lists.newArrayList();
109     for (PartitionOwner partitionOwner : partitionOwners) {
110       if (partitionOwner.getPreviousWorkerInfo() != null) {
111         if (!workerInfoMap.containsKey(
112             partitionOwner.getPreviousWorkerInfo().getTaskId())) {
113           previousWorkerInfos.add(partitionOwner.getPreviousWorkerInfo());
114         }
115       }
116     }
117     output.writeInt(previousWorkerInfos.size());
118     for (WorkerInfo workerInfo : previousWorkerInfos) {
119       workerInfo.write(output);
120     }
121 
122     output.writeInt(partitionOwners.size());
123     if (partitionOwners.size() > 0) {
124       WritableUtils.writeClass(
125           partitionOwners.iterator().next().getClass(), output);
126     }
127     for (PartitionOwner partitionOwner : partitionOwners) {
128       partitionOwner.writeWithWorkerIds(output);
129     }
130   }
131 
132   @Override
133   public void readFields(DataInput input) throws IOException {
134     masterInfo = new MasterInfo();
135     masterInfo.readFields(input);
136 
137     int workerInfosSize = input.readInt();
138     workerInfos = Lists.newArrayListWithCapacity(workerInfosSize);
139     for (int i = 0; i < workerInfosSize; i++) {
140       WorkerInfo workerInfo = new WorkerInfo();
141       workerInfo.readFields(input);
142       workerInfos.add(workerInfo);
143     }
144 
145     Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
146     int additionalWorkerInfos = input.readInt();
147     for (int i = 0; i < additionalWorkerInfos; i++) {
148       WorkerInfo workerInfo = new WorkerInfo();
149       workerInfo.readFields(input);
150       workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
151     }
152 
153     int partitionOwnersSize = input.readInt();
154     Class<PartitionOwner> partitionOwnerClass = null;
155     if (partitionOwnersSize > 0) {
156       partitionOwnerClass = WritableUtils.readClass(input);
157     }
158     partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
159     for (int i = 0; i < partitionOwnersSize; i++) {
160       PartitionOwner partitionOwner =
161           ReflectionUtils.newInstance(partitionOwnerClass);
162       partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap);
163       partitionOwners.add(partitionOwner);
164     }
165   }
166 
167   /**
168    * Convert Iterable of WorkerInfos to the map from task id to WorkerInfo.
169    *
170    * @param workerInfos Iterable of WorkerInfos
171    * @return The map from task id to WorkerInfo
172    */
173   private static Map<Integer, WorkerInfo> getAsWorkerInfoMap(
174       Iterable<WorkerInfo> workerInfos) {
175     Map<Integer, WorkerInfo> workerInfoMap =
176         Maps.newHashMapWithExpectedSize(Iterables.size(workerInfos));
177     for (WorkerInfo workerInfo : workerInfos) {
178       workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
179     }
180     return workerInfoMap;
181   }
182 }