This project has retired. For details please refer to its Attic page.
BasicPartitionOwner xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Map;
25  
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.worker.WorkerInfo;
29  
30  /**
31   * Basic partition owner, can be subclassed for more complicated partition
32   * owner implementations.
33   */
34  public class BasicPartitionOwner implements PartitionOwner,
35      ImmutableClassesGiraphConfigurable {
36    /** Configuration */
37    private ImmutableClassesGiraphConfiguration conf;
38    /** Partition id */
39    private int partitionId = -1;
40    /** Owning worker information */
41    private WorkerInfo workerInfo;
42    /** Previous (if any) worker info */
43    private WorkerInfo previousWorkerInfo;
44    /** Checkpoint files prefix for this partition */
45    private String checkpointFilesPrefix;
46  
47    /**
48     * Default constructor.
49     */
50    public BasicPartitionOwner() { }
51  
52    /**
53     * Constructor with partition id and worker info.
54     *
55     * @param partitionId Partition id of this partition.
56     * @param workerInfo Owner of the partition.
57     */
58    public BasicPartitionOwner(int partitionId, WorkerInfo workerInfo) {
59      this(partitionId, workerInfo, null, null);
60    }
61  
62    /**
63     * Constructor with partition id and worker info.
64     *
65     * @param partitionId Partition id of this partition.
66     * @param workerInfo Owner of the partition.
67     * @param previousWorkerInfo Previous owner of this partition.
68     * @param checkpointFilesPrefix Prefix of the checkpoint files.
69     */
70    public BasicPartitionOwner(int partitionId,
71                               WorkerInfo workerInfo,
72                               WorkerInfo previousWorkerInfo,
73                               String checkpointFilesPrefix) {
74      this.partitionId = partitionId;
75      this.workerInfo = workerInfo;
76      this.previousWorkerInfo = previousWorkerInfo;
77      this.checkpointFilesPrefix = checkpointFilesPrefix;
78    }
79  
80    @Override
81    public int getPartitionId() {
82      return partitionId;
83    }
84  
85    @Override
86    public WorkerInfo getWorkerInfo() {
87      return workerInfo;
88    }
89  
90    @Override
91    public void setWorkerInfo(WorkerInfo workerInfo) {
92      this.workerInfo = workerInfo;
93    }
94  
95    @Override
96    public WorkerInfo getPreviousWorkerInfo() {
97      return previousWorkerInfo;
98    }
99  
100   @Override
101   public void setPreviousWorkerInfo(WorkerInfo workerInfo) {
102     this.previousWorkerInfo = workerInfo;
103   }
104 
105   @Override
106   public void writeWithWorkerIds(DataOutput output) throws IOException {
107     output.writeInt(partitionId);
108     output.writeInt(workerInfo.getTaskId());
109     if (previousWorkerInfo != null) {
110       output.writeInt(previousWorkerInfo.getTaskId());
111     } else {
112       output.writeInt(-1);
113     }
114     if (checkpointFilesPrefix != null) {
115       output.writeBoolean(true);
116       output.writeUTF(checkpointFilesPrefix);
117     } else {
118       output.writeBoolean(false);
119     }
120   }
121 
122   @Override
123   public void readFieldsWithWorkerIds(DataInput input,
124       Map<Integer, WorkerInfo> workerInfoMap) throws IOException {
125     partitionId = input.readInt();
126     int workerId = input.readInt();
127     workerInfo = workerInfoMap.get(workerId);
128     int previousWorkerId = input.readInt();
129     if (previousWorkerId != -1) {
130       previousWorkerInfo = workerInfoMap.get(previousWorkerId);
131     }
132     boolean hasCheckpointFilePrefix = input.readBoolean();
133     if (hasCheckpointFilePrefix) {
134       checkpointFilesPrefix = input.readUTF();
135     }
136   }
137 
138   @Override
139   public void readFields(DataInput input) throws IOException {
140     partitionId = input.readInt();
141     workerInfo = new WorkerInfo();
142     workerInfo.readFields(input);
143     boolean hasPreviousWorkerInfo = input.readBoolean();
144     if (hasPreviousWorkerInfo) {
145       previousWorkerInfo = new WorkerInfo();
146       previousWorkerInfo.readFields(input);
147     }
148     boolean hasCheckpointFilePrefix = input.readBoolean();
149     if (hasCheckpointFilePrefix) {
150       checkpointFilesPrefix = input.readUTF();
151     }
152   }
153 
154   @Override
155   public void write(DataOutput output) throws IOException {
156     output.writeInt(partitionId);
157     workerInfo.write(output);
158     if (previousWorkerInfo != null) {
159       output.writeBoolean(true);
160       previousWorkerInfo.write(output);
161     } else {
162       output.writeBoolean(false);
163     }
164     if (checkpointFilesPrefix != null) {
165       output.writeBoolean(true);
166       output.writeUTF(checkpointFilesPrefix);
167     } else {
168       output.writeBoolean(false);
169     }
170   }
171 
172   @Override
173   public ImmutableClassesGiraphConfiguration getConf() {
174     return conf;
175   }
176 
177   @Override
178   public void setConf(ImmutableClassesGiraphConfiguration conf) {
179     this.conf = conf;
180   }
181 
182   @Override
183   public String toString() {
184     return "(id=" + partitionId + ",cur=" + workerInfo + ",prev=" +
185         previousWorkerInfo + ",ckpt_file=" + checkpointFilesPrefix + ")";
186   }
187 }