This project has retired. For details please refer to its Attic page.
AsyncMessageStoreWrapperTest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.queue;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertNotNull;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.util.Arrays;
28  
29  import org.apache.giraph.comm.messages.MessageStore;
30  import org.apache.giraph.factories.TestMessageValueFactory;
31  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
32  import org.apache.giraph.utils.VertexIdMessages;
33  import org.apache.hadoop.io.IntWritable;
34  import org.apache.hadoop.io.LongWritable;
35  import org.junit.Test;
36  
37  /**
38   * Test case for AsyncMessageStoreWrapper
39   */
40  public class AsyncMessageStoreWrapperTest {
41  
42  
43    @Test
44    public void testAsyncQueue() {
45      TestMessageStore store = new TestMessageStore();
46  
47      AsyncMessageStoreWrapper<LongWritable, IntWritable> queue =
48          new AsyncMessageStoreWrapper<>(store,
49          Arrays.asList(0, 1, 2, 3, 4), 2);
50  
51      for (int i = 0; i < 1000; i++) {
52        queue.addPartitionMessages(i % 5, new ByteArrayVertexIdMessages<LongWritable, IntWritable>(new TestMessageValueFactory<>(IntWritable.class)));
53      }
54  
55      queue.waitToComplete();
56  
57      assertArrayEquals(new int[] {200, 200, 200, 200, 200}, store.counters);
58  
59      queue.clearAll();
60    }
61  
62  
63    static class TestMessageStore implements MessageStore<LongWritable, IntWritable> {
64  
65      private int counters[] = new int[5];
66  
67      @Override
68      public void addPartitionMessages(int partition, VertexIdMessages messages) {
69        assertNotNull(messages);
70        counters[partition]++;
71      }
72  
73      @Override
74      public void addMessage(LongWritable vertexId, IntWritable message) throws IOException {
75      }
76  
77      @Override
78      public boolean isPointerListEncoding() {
79        return false;
80      }
81  
82      @Override
83      public Iterable<IntWritable> getVertexMessages(LongWritable vertexId) {
84        return null;
85      }
86  
87      @Override
88      public void clearVertexMessages(LongWritable vertexId) {
89  
90      }
91  
92      @Override
93      public void clearAll() {
94  
95      }
96  
97      @Override
98      public boolean hasMessagesForVertex(LongWritable vertexId) {
99        return false;
100     }
101 
102     @Override
103     public boolean hasMessagesForPartition(int partitionId) {
104       return false;
105     }
106 
107     @Override
108     public void finalizeStore() {
109 
110     }
111 
112     @Override
113     public Iterable<LongWritable> getPartitionDestinationVertices(int partitionId) {
114       return null;
115     }
116 
117     @Override
118     public void clearPartition(int partitionId) {
119 
120     }
121 
122     @Override
123     public void writePartition(DataOutput out, int partitionId) throws IOException {
124 
125     }
126 
127     @Override
128     public void readFieldsForPartition(DataInput in, int partitionId) throws IOException {
129 
130     }
131 
132   }
133 }