1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.aggregators;
20
21 import java.util.List;
22
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.worker.WorkerInfo;
25
26 /**
27 * Class for aggregator constants and utility methods
28 */
29 public class AggregatorUtils {
30
31 /** How big a single aggregator request can be (in bytes) */
32 public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =
33 "giraph.maxBytesPerAggregatorRequest";
34 /** Default max size of single aggregator request (1MB) */
35 public static final int MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT =
36 1024 * 1024;
37 /**
38 * Whether or not to have a copy of aggregators for each compute thread.
39 * Unless aggregators are very large and it would hurt the application to
40 * have that many copies of them, user should use thread-local aggregators
41 * to prevent synchronization when aggregate() is called (and get better
42 * performance because of it).
43 */
44 public static final String USE_THREAD_LOCAL_AGGREGATORS =
45 "giraph.useThreadLocalAggregators";
46 /** Default is not to have a copy of aggregators for each thread */
47 public static final boolean USE_THREAD_LOCAL_AGGREGATORS_DEFAULT = false;
48
49 /** Do not instantiate */
50 private AggregatorUtils() { }
51
52 /**
53 * Get owner of aggregator with selected name from the list of workers
54 *
55 * @param aggregatorName Name of the aggregators
56 * @param workers List of workers
57 * @return Worker which owns the aggregator
58 */
59 public static WorkerInfo getOwner(String aggregatorName,
60 List<WorkerInfo> workers) {
61 int index = Math.abs(aggregatorName.hashCode() % workers.size());
62 return workers.get(index);
63 }
64
65 /**
66 * Check if we should use thread local aggregators.
67 *
68 * @param conf Giraph configuration
69 * @return True iff we should use thread local aggregators
70 */
71 public static boolean
72 useThreadLocalAggregators(ImmutableClassesGiraphConfiguration conf) {
73 return conf.getBoolean(USE_THREAD_LOCAL_AGGREGATORS,
74 USE_THREAD_LOCAL_AGGREGATORS_DEFAULT);
75 }
76
77 /**
78 * Get the warning message about usage of unregistered aggregator to be
79 * printed to user. If user didn't register any aggregators also provide
80 * the explanation on how to do so.
81 *
82 * @param aggregatorName The name of the aggregator which user tried to
83 * access
84 * @param hasRegisteredAggregators True iff user registered some aggregators
85 * @param conf Giraph configuration
86 * @return Warning message
87 */
88 public static String getUnregisteredAggregatorMessage(
89 String aggregatorName, boolean hasRegisteredAggregators,
90 ImmutableClassesGiraphConfiguration conf) {
91 String message = "Tried to access aggregator which wasn't registered " +
92 aggregatorName;
93 if (!hasRegisteredAggregators) {
94 message = message + "; Aggregators can be registered in " +
95 "MasterCompute.initialize by calling " +
96 "registerAggregator(aggregatorName, aggregatorClass). " +
97 "Also be sure that you are correctly setting MasterCompute class, " +
98 "currently using " + conf.getMasterComputeClass().getName();
99 }
100 return message;
101 }
102
103 /**
104 * Get the warning message about usage of unregistered reducer to be
105 * printed to user. If user didn't register any reducers also provide
106 * the explanation on how to do so.
107 *
108 * @param reducerName The name of the aggregator which user tried to
109 * access
110 * @param hasRegisteredReducers True iff user registered some aggregators
111 * @param conf Giraph configuration
112 * @return Warning message
113 */
114 public static String getUnregisteredReducerMessage(
115 String reducerName, boolean hasRegisteredReducers,
116 ImmutableClassesGiraphConfiguration conf) {
117 String message = "Tried to access reducer which wasn't registered " +
118 reducerName;
119 if (!hasRegisteredReducers) {
120 message = message + "; Aggregators can be registered from " +
121 "MasterCompute by calling registerReducer function. " +
122 "Also be sure that you are correctly setting MasterCompute class, " +
123 "currently using " + conf.getMasterComputeClass().getName();
124 }
125 return message;
126 }
127
128 /**
129 * Get the warning message when user tries to access broadcast, without
130 * previously setting it, to be printed to user.
131 * If user didn't broadcast any value also provide
132 * the explanation on how to do so.
133 *
134 * @param broadcastName The name of the broadcast which user tried to
135 * access
136 * @param hasBroadcasted True iff user has broadcasted value before
137 * @param conf Giraph configuration
138 * @return Warning message
139 */
140 public static String getUnregisteredBroadcastMessage(
141 String broadcastName, boolean hasBroadcasted,
142 ImmutableClassesGiraphConfiguration conf) {
143 String message = "Tried to access broadcast which wasn't set before " +
144 broadcastName;
145 if (!hasBroadcasted) {
146 message = message + "; Values can be broadcasted from " +
147 "MasterCompute by calling broadcast function. " +
148 "Also be sure that you are correctly setting MasterCompute class, " +
149 "currently using " + conf.getMasterComputeClass().getName();
150 }
151 return message;
152 }
153 }