This project has retired. For details please refer to its Attic page.
AggregatorUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.aggregators;
20  
21  import java.util.List;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.worker.WorkerInfo;
25  
26  /**
27   * Class for aggregator constants and utility methods
28   */
29  public class AggregatorUtils {
30  
31    /** How big a single aggregator request can be (in bytes) */
32    public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =
33        "giraph.maxBytesPerAggregatorRequest";
34    /** Default max size of single aggregator request (1MB) */
35    public static final int MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT =
36        1024 * 1024;
37    /**
38     * Whether or not to have a copy of aggregators for each compute thread.
39     * Unless aggregators are very large and it would hurt the application to
40     * have that many copies of them, user should use thread-local aggregators
41     * to prevent synchronization when aggregate() is called (and get better
42     * performance because of it).
43     */
44    public static final String USE_THREAD_LOCAL_AGGREGATORS =
45        "giraph.useThreadLocalAggregators";
46    /** Default is not to have a copy of aggregators for each thread */
47    public static final boolean USE_THREAD_LOCAL_AGGREGATORS_DEFAULT = false;
48  
49    /** Do not instantiate */
50    private AggregatorUtils() { }
51  
52    /**
53     * Get owner of aggregator with selected name from the list of workers
54     *
55     * @param aggregatorName Name of the aggregators
56     * @param workers List of workers
57     * @return Worker which owns the aggregator
58     */
59    public static WorkerInfo getOwner(String aggregatorName,
60        List<WorkerInfo> workers) {
61      int index = Math.abs(aggregatorName.hashCode() % workers.size());
62      return workers.get(index);
63    }
64  
65    /**
66     * Check if we should use thread local aggregators.
67     *
68     * @param conf Giraph configuration
69     * @return True iff we should use thread local aggregators
70     */
71    public static boolean
72    useThreadLocalAggregators(ImmutableClassesGiraphConfiguration conf) {
73      return conf.getBoolean(USE_THREAD_LOCAL_AGGREGATORS,
74          USE_THREAD_LOCAL_AGGREGATORS_DEFAULT);
75    }
76  
77    /**
78     * Get the warning message about usage of unregistered aggregator to be
79     * printed to user. If user didn't register any aggregators also provide
80     * the explanation on how to do so.
81     *
82     * @param aggregatorName The name of the aggregator which user tried to
83     *                       access
84     * @param hasRegisteredAggregators True iff user registered some aggregators
85     * @param conf Giraph configuration
86     * @return Warning message
87     */
88    public static String getUnregisteredAggregatorMessage(
89        String aggregatorName, boolean hasRegisteredAggregators,
90        ImmutableClassesGiraphConfiguration conf) {
91      String message = "Tried to access aggregator which wasn't registered " +
92          aggregatorName;
93      if (!hasRegisteredAggregators) {
94        message = message + "; Aggregators can be registered in " +
95            "MasterCompute.initialize by calling " +
96            "registerAggregator(aggregatorName, aggregatorClass). " +
97            "Also be sure that you are correctly setting MasterCompute class, " +
98            "currently using " + conf.getMasterComputeClass().getName();
99      }
100     return message;
101   }
102 
103   /**
104    * Get the warning message about usage of unregistered reducer to be
105    * printed to user. If user didn't register any reducers also provide
106    * the explanation on how to do so.
107    *
108    * @param reducerName The name of the aggregator which user tried to
109    *                       access
110    * @param hasRegisteredReducers True iff user registered some aggregators
111    * @param conf Giraph configuration
112    * @return Warning message
113    */
114   public static String getUnregisteredReducerMessage(
115       String reducerName, boolean hasRegisteredReducers,
116       ImmutableClassesGiraphConfiguration conf) {
117     String message = "Tried to access reducer which wasn't registered " +
118         reducerName;
119     if (!hasRegisteredReducers) {
120       message = message + "; Aggregators can be registered from " +
121           "MasterCompute by calling registerReducer function. " +
122           "Also be sure that you are correctly setting MasterCompute class, " +
123           "currently using " + conf.getMasterComputeClass().getName();
124     }
125     return message;
126   }
127 
128   /**
129    * Get the warning message when user tries to access broadcast, without
130    * previously setting it, to be printed to user.
131    * If user didn't broadcast any value also provide
132    * the explanation on how to do so.
133    *
134    * @param broadcastName The name of the broadcast which user tried to
135    *                       access
136    * @param hasBroadcasted True iff user has broadcasted value before
137    * @param conf Giraph configuration
138    * @return Warning message
139    */
140   public static String getUnregisteredBroadcastMessage(
141       String broadcastName, boolean hasBroadcasted,
142       ImmutableClassesGiraphConfiguration conf) {
143     String message = "Tried to access broadcast which wasn't set before " +
144         broadcastName;
145     if (!hasBroadcasted) {
146       message = message + "; Values can be broadcasted from " +
147           "MasterCompute by calling broadcast function. " +
148           "Also be sure that you are correctly setting MasterCompute class, " +
149           "currently using " + conf.getMasterComputeClass().getName();
150     }
151     return message;
152   }
153 }