1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.aggregators;
2021import java.util.List;
2223import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.worker.WorkerInfo;
2526/**27 * Class for aggregator constants and utility methods28 */29publicclassAggregatorUtils {
3031/** How big a single aggregator request can be (in bytes) */32publicstaticfinal String MAX_BYTES_PER_AGGREGATOR_REQUEST =
33"giraph.maxBytesPerAggregatorRequest";
34/** Default max size of single aggregator request (1MB) */35publicstaticfinalint MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT =
36 1024 * 1024;
37/**38 * Whether or not to have a copy of aggregators for each compute thread.39 * Unless aggregators are very large and it would hurt the application to40 * have that many copies of them, user should use thread-local aggregators41 * to prevent synchronization when aggregate() is called (and get better42 * performance because of it).43 */44publicstaticfinal String USE_THREAD_LOCAL_AGGREGATORS =
45"giraph.useThreadLocalAggregators";
46/** Default is not to have a copy of aggregators for each thread */47publicstaticfinalboolean USE_THREAD_LOCAL_AGGREGATORS_DEFAULT = false;
4849/** Do not instantiate */50privateAggregatorUtils() { }
5152/**53 * Get owner of aggregator with selected name from the list of workers54 *55 * @param aggregatorName Name of the aggregators56 * @param workers List of workers57 * @return Worker which owns the aggregator58 */59publicstaticWorkerInfo getOwner(String aggregatorName,
60 List<WorkerInfo> workers) {
61int index = Math.abs(aggregatorName.hashCode() % workers.size());
62return workers.get(index);
63 }
6465/**66 * Check if we should use thread local aggregators.67 *68 * @param conf Giraph configuration69 * @return True iff we should use thread local aggregators70 */71publicstaticboolean72 useThreadLocalAggregators(ImmutableClassesGiraphConfiguration conf) {
73return conf.getBoolean(USE_THREAD_LOCAL_AGGREGATORS,
74 USE_THREAD_LOCAL_AGGREGATORS_DEFAULT);
75 }
7677/**78 * Get the warning message about usage of unregistered aggregator to be79 * printed to user. If user didn't register any aggregators also provide80 * the explanation on how to do so.81 *82 * @param aggregatorName The name of the aggregator which user tried to83 * access84 * @param hasRegisteredAggregators True iff user registered some aggregators85 * @param conf Giraph configuration86 * @return Warning message87 */88publicstatic String getUnregisteredAggregatorMessage(
89 String aggregatorName, boolean hasRegisteredAggregators,
90ImmutableClassesGiraphConfiguration conf) {
91 String message = "Tried to access aggregator which wasn't registered " +
92 aggregatorName;
93if (!hasRegisteredAggregators) {
94 message = message + "; Aggregators can be registered in " +
95"MasterCompute.initialize by calling " +
96"registerAggregator(aggregatorName, aggregatorClass). " +
97"Also be sure that you are correctly setting MasterCompute class, " +
98"currently using " + conf.getMasterComputeClass().getName();
99 }
100return message;
101 }
102103/**104 * Get the warning message about usage of unregistered reducer to be105 * printed to user. If user didn't register any reducers also provide106 * the explanation on how to do so.107 *108 * @param reducerName The name of the aggregator which user tried to109 * access110 * @param hasRegisteredReducers True iff user registered some aggregators111 * @param conf Giraph configuration112 * @return Warning message113 */114publicstatic String getUnregisteredReducerMessage(
115 String reducerName, boolean hasRegisteredReducers,
116ImmutableClassesGiraphConfiguration conf) {
117 String message = "Tried to access reducer which wasn't registered " +
118 reducerName;
119if (!hasRegisteredReducers) {
120 message = message + "; Aggregators can be registered from " +
121"MasterCompute by calling registerReducer function. " +
122"Also be sure that you are correctly setting MasterCompute class, " +
123"currently using " + conf.getMasterComputeClass().getName();
124 }
125return message;
126 }
127128/**129 * Get the warning message when user tries to access broadcast, without130 * previously setting it, to be printed to user.131 * If user didn't broadcast any value also provide132 * the explanation on how to do so.133 *134 * @param broadcastName The name of the broadcast which user tried to135 * access136 * @param hasBroadcasted True iff user has broadcasted value before137 * @param conf Giraph configuration138 * @return Warning message139 */140publicstatic String getUnregisteredBroadcastMessage(
141 String broadcastName, boolean hasBroadcasted,
142ImmutableClassesGiraphConfiguration conf) {
143 String message = "Tried to access broadcast which wasn't set before " +
144 broadcastName;
145if (!hasBroadcasted) {
146 message = message + "; Values can be broadcasted from " +
147"MasterCompute by calling broadcast function. " +
148"Also be sure that you are correctly setting MasterCompute class, " +
149"currently using " + conf.getMasterComputeClass().getName();
150 }
151return message;
152 }
153 }