This project has retired. For details please refer to its Attic page.
CollectShardedTuplesOfPrimitivesReducerHandle xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.collect;
19  
20  import java.util.ArrayList;
21  import java.util.List;
22  
23  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
24  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
25  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
26  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
27  import org.apache.giraph.master.MasterGlobalCommUsage;
28  import org.apache.giraph.reducers.ReduceOperation;
29  import org.apache.giraph.types.ops.PrimitiveTypeOps;
30  import org.apache.giraph.types.ops.TypeOpsUtils;
31  import org.apache.giraph.types.ops.collections.array.WArrayList;
32  import org.apache.giraph.worker.WorkerBroadcastUsage;
33  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
34  
35  /**
36   * ShardedReducerHandle where we keep a list of reduced values,
37   * and values consist of multiple primitives, so we keep one primitive
38   * list for each
39   */
40  @SuppressWarnings("unchecked")
41  public class CollectShardedTuplesOfPrimitivesReducerHandle
42  extends ShardedReducerHandle<List<Object>, List<WArrayList>> {
43    /**
44     * Type ops if available, or null
45     */
46    private final List<PrimitiveTypeOps> typeOpsList;
47  
48    public CollectShardedTuplesOfPrimitivesReducerHandle(
49        final CreateReducersApi reduceApi, Class<?>... valueClasses) {
50      typeOpsList = new ArrayList<>();
51      for (Class<?> valueClass : valueClasses) {
52        typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(valueClass));
53      }
54      register(reduceApi);
55    }
56  
57    public List<Object> createSingleValue() {
58      List<Object> ret = new ArrayList<>();
59      for (PrimitiveTypeOps typeOps : typeOpsList) {
60        ret.add(typeOps.create());
61      }
62      return ret;
63    }
64  
65    @Override
66    public ReduceOperation<List<Object>,
67        KryoWritableWrapper<List<WArrayList>>> createReduceOperation() {
68      return new CollectTuplesOfPrimitivesReduceOperation(typeOpsList);
69    }
70  
71    @Override
72    public List<WArrayList> createReduceResult(
73        MasterGlobalCommUsage master) {
74      int size = 0;
75      for (int i = 0; i < REDUCER_COUNT; i++) {
76        size += reducers.get(i).getReducedValue(master).get().get(0).size();
77      }
78      return createLists(size);
79    }
80  
81    public List<WArrayList> createLists(int size) {
82      List<WArrayList> ret = new ArrayList<>();
83      for (PrimitiveTypeOps typeOps : typeOpsList) {
84        ret.add(typeOps.createArrayList(size));
85      }
86      return ret;
87    }
88  
89    @Override
90    public BroadcastHandle<List<WArrayList>> createBroadcastHandle(
91        BroadcastArrayHandle<KryoWritableWrapper<List<WArrayList>>>
92            broadcasts) {
93      return new CollectShardedTuplesOfPrimitivesBroadcastHandle(broadcasts);
94    }
95  
96    /**
97     * BroadcastHandle for CollectShardedTuplesOfPrimitivesReducerHandle
98     */
99    public class CollectShardedTuplesOfPrimitivesBroadcastHandle
100       extends ShardedBroadcastHandle {
101     public CollectShardedTuplesOfPrimitivesBroadcastHandle(
102         BroadcastArrayHandle<KryoWritableWrapper<List<WArrayList>>>
103             broadcasts) {
104       super(broadcasts);
105     }
106 
107     @Override
108     public List<WArrayList> createBroadcastResult(
109         WorkerBroadcastUsage worker) {
110       int size = 0;
111       for (int i = 0; i < REDUCER_COUNT; i++) {
112         size += broadcasts.get(i).getBroadcast(worker).get().size();
113       }
114       return createLists(size);
115     }
116   }
117 
118   /**
119    * Reduce broadcast wrapper
120    */
121   public static class CollectShardedTuplesOfPrimitivesReduceBroadcast {
122     private CollectShardedTuplesOfPrimitivesReducerHandle reducerHandle;
123     private BroadcastHandle<List<WArrayList>> broadcastHandle;
124 
125     /** Set reducer handle to just registered handle */
126     public void registeredReducer(CreateReducersApi reduceApi,
127         Class<?>... valueClasses) {
128       this.reducerHandle = new CollectShardedTuplesOfPrimitivesReducerHandle(
129           reduceApi, valueClasses);
130     }
131 
132     public List<Object> createSingleValue() {
133       return reducerHandle.createSingleValue();
134     }
135 
136     /** Reduce single value */
137     public void reduce(List<Object> valueToReduce) {
138       reducerHandle.reduce(valueToReduce);
139     }
140 
141     /** Get reduced value */
142     public List<WArrayList> getReducedValue(MasterGlobalCommUsage master) {
143       return reducerHandle.getReducedValue(master);
144     }
145 
146     /**
147      * Broadcast reduced value from master
148      */
149     public void broadcastValue(BlockMasterApi master) {
150       broadcastHandle = reducerHandle.broadcastValue(master);
151     }
152 
153     /** Get broadcasted value */
154     public List<WArrayList> getBroadcast(WorkerBroadcastUsage worker) {
155       return broadcastHandle.getBroadcast(worker);
156     }
157   }
158 }