This project has retired. For details please refer to its
Attic page.
TestBasicCollections xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.types;
19
20 import io.netty.util.internal.ThreadLocalRandom;
21
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26
27 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
28 import org.apache.giraph.types.ops.collections.BasicCollectionsUtils;
29 import org.apache.hadoop.io.FloatWritable;
30 import org.apache.hadoop.io.IntWritable;
31 import org.apache.hadoop.io.LongWritable;
32 import org.junit.Assert;
33 import org.junit.Test;
34
35
36
37
38 public class TestBasicCollections {
39 private void testLongWritable2Object(Map<Long, String> input) {
40 Basic2ObjectMap<LongWritable, String> map = BasicCollectionsUtils.create2ObjectMap(LongWritable.class);
41
42 LongWritable longW = new LongWritable();
43
44 long keySum = 0;
45 for (Long key : input.keySet()) {
46 longW.set(key.longValue());
47 Assert.assertNull(map.put(longW, input.get(key)));
48 keySum += key.longValue();
49 }
50 Assert.assertEquals(input.size(), map.size());
51
52 long sum = 0;
53 Iterator<LongWritable> iterator = map.fastKeyIterator();
54 while (iterator.hasNext()) {
55 sum += iterator.next().get();
56 }
57 Assert.assertEquals(keySum, sum);
58
59 for (Long key : input.keySet()) {
60 longW.set(key.longValue());
61 Assert.assertEquals(input.get(key), map.get(longW));
62 map.remove(longW);
63 }
64 Assert.assertEquals(0, map.size());
65 }
66
67 private void testFloatWritable2Object(Map<Float, String> input) {
68 Basic2ObjectMap<FloatWritable, String> map = BasicCollectionsUtils.create2ObjectMap(FloatWritable.class);
69
70 FloatWritable floatW = new FloatWritable();
71
72 float keySum = 0;
73 for (Float key : input.keySet()) {
74 floatW.set(key.longValue());
75 Assert.assertNull(map.put(floatW, input.get(key)));
76 keySum += key.longValue();
77 }
78 Assert.assertEquals(input.size(), map.size());
79
80 float sum = 0;
81 Iterator<FloatWritable> iterator = map.fastKeyIterator();
82 while (iterator.hasNext()) {
83 sum += iterator.next().get();
84 }
85 Assert.assertEquals(keySum, sum, 1e-6);
86
87 for (Float key : input.keySet()) {
88 floatW.set(key.longValue());
89 Assert.assertEquals(input.get(key), map.get(floatW));
90 map.remove(floatW);
91 }
92 Assert.assertEquals(0, map.size());
93 }
94
95 @Test
96 public void testLongWritable2Object() {
97 Map<Long, String> input = new HashMap<>();
98 input.put(-1l, "a");
99 input.put(0l, "b");
100 input.put(100l, "c");
101 input.put(26256l, "d");
102 input.put(-1367367l, "a");
103 input.put(-35635l, "e");
104 input.put(1234567l, "f");
105 testLongWritable2Object(input);
106 }
107
108 @Test
109 public void testFloatWritable2Object() {
110 Map<Float, String> input = new HashMap<>();
111 input.put(-1f, "a");
112 input.put(0f, "b");
113 input.put(1.23f, "c");
114 input.put(-12.34f, "d");
115 input.put(-1367367.45f, "a");
116 input.put(-3.456f, "e");
117 input.put(12.78f, "f");
118 testFloatWritable2Object(input);
119 }
120
121 private <K, V> V getConcurrently(Basic2ObjectMap<K, V> map, K key, V defaultValue) {
122 synchronized (map) {
123 V value = map.get(key);
124
125 if (value == null) {
126 value = defaultValue;
127 map.put(key, value);
128 }
129 return value;
130 }
131 }
132
133 private <K, V> void removeConcurrently(Basic2ObjectMap<K, V> map, K key) {
134 synchronized (map) {
135 map.remove(key);
136 }
137 }
138
139 @Test
140 public void testLongWritable2ObjectConcurrent() throws InterruptedException {
141 final int numThreads = 10;
142 final int numValues = 100000;
143
144 final Map<Integer, Double> map = new ConcurrentHashMap<>();
145 for (int i = 0; i < numValues; i++) {
146 double value = ThreadLocalRandom.current().nextDouble();
147 map.put(i, value);
148 }
149
150 final int PARTS = 8;
151 final Basic2ObjectMap<IntWritable, Double>[] basicMaps = new Basic2ObjectMap[PARTS];
152 for (int i = 0; i < PARTS; i++) {
153 basicMaps[i] = BasicCollectionsUtils.create2ObjectMap(IntWritable.class);
154 }
155
156 long startTime = System.currentTimeMillis();
157
158
159 Thread[] threads = new Thread[numThreads];
160 for (int t = 0; t < threads.length; t++) {
161 threads[t] = new Thread(new Runnable() {
162 @Override
163 public void run() {
164 IntWritable intW = new IntWritable();
165 for (int i = 0; i < numValues; i++) {
166 intW.set(i);
167 double value = getConcurrently(basicMaps[(i * 123 + 17) % PARTS], intW, map.get(i));
168 Assert.assertEquals(map.get(i).doubleValue(), value, 1e-6);
169 }
170 }
171 });
172 threads[t].start();
173 }
174 for (Thread t : threads) {
175 t.join();
176 }
177 int totalSize = 0;
178 for (int i = 0; i < PARTS; i++) {
179 totalSize += basicMaps[i].size();
180 }
181 Assert.assertEquals(numValues, totalSize);
182
183 long endTime = System.currentTimeMillis();
184 System.out.println("Add Time: " + (endTime - startTime) / 1000.0);
185
186
187 for (int t = 0; t < threads.length; t++) {
188 threads[t] = new Thread(new Runnable() {
189 @Override
190 public void run() {
191 IntWritable intW = new IntWritable();
192 for (int i = 0; i < numValues; i++) {
193 intW.set(i);
194 removeConcurrently(basicMaps[(i * 123 + 17) % PARTS], intW);
195 }
196 }
197 });
198 threads[t].start();
199 }
200 for (Thread t : threads) {
201 t.join();
202 }
203 for (int i = 0; i < PARTS; i++) {
204 Assert.assertEquals(0, basicMaps[i].size());
205 }
206 }
207 }