View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.queue;
20  
21  import org.apache.giraph.comm.messages.MessageStore;
22  import org.apache.giraph.factories.TestMessageValueFactory;
23  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
24  import org.apache.giraph.utils.VertexIdMessages;
25  import org.apache.hadoop.io.IntWritable;
26  import org.apache.hadoop.io.LongWritable;
27  import org.junit.Test;
28  
29  import java.io.DataInput;
30  import java.io.DataOutput;
31  import java.io.IOException;
32  import java.util.Arrays;
33  
34  import static org.junit.Assert.assertArrayEquals;
35  import static org.junit.Assert.assertNotNull;
36  
37  /**
38   * Test case for AsyncMessageStoreWrapper
39   */
40  public class AsyncMessageStoreWrapperTest {
41  
42  
43    @Test
44    public void testAsyncQueue() {
45      TestMessageStore store = new TestMessageStore();
46  
47      AsyncMessageStoreWrapper<LongWritable, IntWritable> queue =
48          new AsyncMessageStoreWrapper<>(store,
49          Arrays.asList(0, 1, 2, 3, 4), 2);
50  
51      for (int i = 0; i < 1000; i++) {
52        queue.addPartitionMessages(i % 5, new ByteArrayVertexIdMessages<LongWritable, IntWritable>(new TestMessageValueFactory<>(IntWritable.class)));
53      }
54  
55      queue.waitToComplete();
56  
57      assertArrayEquals(new int[] {200, 200, 200, 200, 200}, store.counters);
58  
59      queue.clearAll();
60    }
61  
62  
63    static class TestMessageStore implements MessageStore<LongWritable, IntWritable> {
64  
65      private int counters[] = new int[5];
66  
67      @Override
68      public void addPartitionMessages(int partition, VertexIdMessages messages) {
69        assertNotNull(messages);
70        counters[partition]++;
71      }
72  
73      @Override
74      public boolean isPointerListEncoding() {
75        return false;
76      }
77  
78      @Override
79      public Iterable<IntWritable> getVertexMessages(LongWritable vertexId) {
80        return null;
81      }
82  
83      @Override
84      public void clearVertexMessages(LongWritable vertexId) {
85  
86      }
87  
88      @Override
89      public void clearAll() {
90  
91      }
92  
93      @Override
94      public boolean hasMessagesForVertex(LongWritable vertexId) {
95        return false;
96      }
97  
98      @Override
99      public boolean hasMessagesForPartition(int partitionId) {
100       return false;
101     }
102 
103     @Override
104     public void finalizeStore() {
105 
106     }
107 
108     @Override
109     public Iterable<LongWritable> getPartitionDestinationVertices(int partitionId) {
110       return null;
111     }
112 
113     @Override
114     public void clearPartition(int partitionId) {
115 
116     }
117 
118     @Override
119     public void writePartition(DataOutput out, int partitionId) throws IOException {
120 
121     }
122 
123     @Override
124     public void readFieldsForPartition(DataInput in, int partitionId) throws IOException {
125 
126     }
127   }
128 }