1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.block_app.reducers.collect;
19
20 import java.util.ArrayList;
21 import java.util.List;
22
23 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
24 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
25 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
26 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
27 import org.apache.giraph.master.MasterGlobalCommUsage;
28 import org.apache.giraph.reducers.ReduceOperation;
29 import org.apache.giraph.types.ops.PrimitiveTypeOps;
30 import org.apache.giraph.types.ops.TypeOpsUtils;
31 import org.apache.giraph.types.ops.collections.array.WArrayList;
32 import org.apache.giraph.worker.WorkerBroadcastUsage;
33 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
34
35 /**
36 * ShardedReducerHandle where we keep a list of reduced values,
37 * and values consist of multiple primitives, so we keep one primitive
38 * list for each
39 */
40 @SuppressWarnings("unchecked")
41 public class CollectShardedTuplesOfPrimitivesReducerHandle
42 extends ShardedReducerHandle<List<Object>, List<WArrayList>> {
43 /**
44 * Type ops if available, or null
45 */
46 private final List<PrimitiveTypeOps> typeOpsList;
47
48 public CollectShardedTuplesOfPrimitivesReducerHandle(
49 final CreateReducersApi reduceApi, Class<?>... valueClasses) {
50 typeOpsList = new ArrayList<>();
51 for (Class<?> valueClass : valueClasses) {
52 typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(valueClass));
53 }
54 register(reduceApi);
55 }
56
57 public List<Object> createSingleValue() {
58 List<Object> ret = new ArrayList<>();
59 for (PrimitiveTypeOps typeOps : typeOpsList) {
60 ret.add(typeOps.create());
61 }
62 return ret;
63 }
64
65 @Override
66 public ReduceOperation<List<Object>,
67 KryoWritableWrapper<List<WArrayList>>> createReduceOperation() {
68 return new CollectTuplesOfPrimitivesReduceOperation(typeOpsList);
69 }
70
71 @Override
72 public List<WArrayList> createReduceResult(
73 MasterGlobalCommUsage master) {
74 int size = 0;
75 for (int i = 0; i < REDUCER_COUNT; i++) {
76 size += reducers.get(i).getReducedValue(master).get().get(0).size();
77 }
78 return createLists(size);
79 }
80
81 public List<WArrayList> createLists(int size) {
82 List<WArrayList> ret = new ArrayList<>();
83 for (PrimitiveTypeOps typeOps : typeOpsList) {
84 ret.add(typeOps.createArrayList(size));
85 }
86 return ret;
87 }
88
89 @Override
90 public BroadcastHandle<List<WArrayList>> createBroadcastHandle(
91 BroadcastArrayHandle<KryoWritableWrapper<List<WArrayList>>>
92 broadcasts) {
93 return new CollectShardedTuplesOfPrimitivesBroadcastHandle(broadcasts);
94 }
95
96 /**
97 * BroadcastHandle for CollectShardedTuplesOfPrimitivesReducerHandle
98 */
99 public class CollectShardedTuplesOfPrimitivesBroadcastHandle
100 extends ShardedBroadcastHandle {
101 public CollectShardedTuplesOfPrimitivesBroadcastHandle(
102 BroadcastArrayHandle<KryoWritableWrapper<List<WArrayList>>>
103 broadcasts) {
104 super(broadcasts);
105 }
106
107 @Override
108 public List<WArrayList> createBroadcastResult(
109 WorkerBroadcastUsage worker) {
110 int size = 0;
111 for (int i = 0; i < REDUCER_COUNT; i++) {
112 size += broadcasts.get(i).getBroadcast(worker).get().size();
113 }
114 return createLists(size);
115 }
116 }
117
118 /**
119 * Reduce broadcast wrapper
120 */
121 public static class CollectShardedTuplesOfPrimitivesReduceBroadcast {
122 private CollectShardedTuplesOfPrimitivesReducerHandle reducerHandle;
123 private BroadcastHandle<List<WArrayList>> broadcastHandle;
124
125 /** Set reducer handle to just registered handle */
126 public void registeredReducer(CreateReducersApi reduceApi,
127 Class<?>... valueClasses) {
128 this.reducerHandle = new CollectShardedTuplesOfPrimitivesReducerHandle(
129 reduceApi, valueClasses);
130 }
131
132 public List<Object> createSingleValue() {
133 return reducerHandle.createSingleValue();
134 }
135
136 /** Reduce single value */
137 public void reduce(List<Object> valueToReduce) {
138 reducerHandle.reduce(valueToReduce);
139 }
140
141 /** Get reduced value */
142 public List<WArrayList> getReducedValue(MasterGlobalCommUsage master) {
143 return reducerHandle.getReducedValue(master);
144 }
145
146 /**
147 * Broadcast reduced value from master
148 */
149 public void broadcastValue(BlockMasterApi master) {
150 broadcastHandle = reducerHandle.broadcastValue(master);
151 }
152
153 /** Get broadcasted value */
154 public List<WArrayList> getBroadcast(WorkerBroadcastUsage worker) {
155 return broadcastHandle.getBroadcast(worker);
156 }
157 }
158 }