This project has retired. For details please refer to its Attic page.
WorkerGraphPartitionerImpl xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import java.util.Collection;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  
26  import org.apache.giraph.worker.WorkerInfo;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  
30  import com.google.common.collect.Lists;
31  import org.apache.log4j.Logger;
32  
33  /**
34   * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
35   * user function - getPartitionIndex.
36   *
37   * @param <I> Vertex id type
38   * @param <V> Vertex value type
39   * @param <E> Edge value type
40   */
41  public abstract class WorkerGraphPartitionerImpl<I extends WritableComparable,
42      V extends Writable, E extends Writable>
43      implements WorkerGraphPartitioner<I, V, E> {
44    /** Logger instance */
45    private static final Logger LOG = Logger.getLogger(
46        WorkerGraphPartitionerImpl.class);
47    /** List of {@link PartitionOwner}s for this worker. */
48    private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
49    /** List of available workers */
50    private Set<WorkerInfo> availableWorkers = new HashSet<>();
51  
52    @Override
53    public PartitionOwner createPartitionOwner() {
54      return new BasicPartitionOwner();
55    }
56  
57    @Override
58    public PartitionOwner getPartitionOwner(I vertexId) {
59      return partitionOwnerList.get(
60          getPartitionIndex(vertexId, partitionOwnerList.size(),
61              availableWorkers.size()));
62    }
63  
64    @Override
65    public Collection<PartitionStats> finalizePartitionStats(
66        Collection<PartitionStats> workerPartitionStats,
67        PartitionStore<I, V, E> partitionStore) {
68      // No modification necessary
69      return workerPartitionStats;
70    }
71  
72    @Override
73    public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
74        Collection<? extends PartitionOwner> masterSetPartitionOwners) {
75      PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
76          partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
77      extractAvailableWorkers();
78      return exchange;
79    }
80  
81    @Override
82    public Collection<? extends PartitionOwner> getPartitionOwners() {
83      return partitionOwnerList;
84    }
85  
86    /**
87     * Update availableWorkers
88     */
89    public void extractAvailableWorkers() {
90      availableWorkers.clear();
91      for (PartitionOwner partitionOwner : partitionOwnerList) {
92        availableWorkers.add(partitionOwner.getWorkerInfo());
93      }
94      LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
95          " workers are available");
96    }
97  
98    /**
99     * Calculates in which partition current vertex belongs to,
100    * from interval [0, partitionCount).
101    *
102    * @param id Vertex id
103    * @param partitionCount Number of partitions
104    * @param workerCount Number of active workers
105    * @return partition
106    */
107   protected abstract int getPartitionIndex(I id, int partitionCount,
108     int workerCount);
109 }