Skip to content

Commit

Permalink
ACCUMULO-469 added license headers
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1302533 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
billierinaldi committed Mar 19, 2012
1 parent e1dfeb6 commit 66bb45c
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.ingest;

import java.util.LinkedHashMap;
Expand All @@ -6,14 +22,12 @@
public class LRUOutputCombiner<Key,Value> extends LinkedHashMap<Key,Value> {

private static final long serialVersionUID = 1L;

public static abstract class Fold <Value>
{

public static abstract class Fold<Value> {
public abstract Value fold(Value oldValue, Value newValue);
}

public static abstract class Output<Key,Value>
{
public static abstract class Output<Key,Value> {
public abstract void output(Key key, Value value);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.output;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ColumnUpdate;
Expand All @@ -31,38 +47,32 @@ final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> {

private Map<Text,TreeMap<Key,Value>> buffers = new HashMap<Text,TreeMap<Key,Value>>();
private Map<Text,Long> bufferSizes = new HashMap<Text,Long>();

private TreeMap<Key,Value> getBuffer(Text tablename)
{

private TreeMap<Key,Value> getBuffer(Text tablename) {
TreeMap<Key,Value> buffer = buffers.get(tablename);
if(buffer == null)
{
if (buffer == null) {
buffer = new TreeMap<Key,Value>();
buffers.put(tablename, buffer);
bufferSizes.put(tablename, 0l);
}
return buffer;
}

private Text getLargestTablename()
{
private Text getLargestTablename() {
long max = 0;
Text table = null;
for(Entry<Text,Long> e:bufferSizes.entrySet())
{
if(e.getValue() > max)
{
for (Entry<Text,Long> e : bufferSizes.entrySet()) {
if (e.getValue() > max) {
max = e.getValue();
table = e.getKey();
}
}
return table;
}

private void flushLargestTable() throws IOException
{
private void flushLargestTable() throws IOException {
Text tablename = getLargestTablename();
if(tablename == null)
if (tablename == null)
return;
long bufferSize = bufferSizes.get(tablename);
TreeMap<Key,Value> buffer = buffers.get(tablename);
Expand Down Expand Up @@ -98,17 +108,17 @@ private void flushLargestTable() throws IOException

@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
while(size > 0)
while (size > 0)
flushLargestTable();
}

@Override
public void write(Text table, Mutation mutation) throws IOException, InterruptedException {
TreeMap<Key,Value> buffer = getBuffer(table);
int mutationSize = 0;
for(ColumnUpdate update: mutation.getUpdates())
{
Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted());
for (ColumnUpdate update : mutation.getUpdates()) {
Key k = new Key(mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(),
update.isDeleted());
Value v = new Value(update.getValue());
// TODO account for object overhead
mutationSize += k.getSize();
Expand All @@ -121,7 +131,7 @@ public void write(Text table, Mutation mutation) throws IOException, Interrupted
// TODO use a MutableLong instead
bufferSize += mutationSize;
bufferSizes.put(table, bufferSize);

while (size >= maxSize) {
flushLargestTable();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.output;

import java.io.IOException;
Expand All @@ -14,9 +30,9 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {

// private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);

public static final String PATH_NAME = "sortingrfileoutputformat.path";
public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.ingest;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
Expand All @@ -17,20 +32,19 @@

public class WikipediaInputSplitTest {
@Test
public void testSerialization() throws IOException
{
public void testSerialization() throws IOException {
Path testPath = new Path("/foo/bar");
String [] hosts = new String [2];
String[] hosts = new String[2];
hosts[0] = "abcd";
hosts[1] = "efgh";
FileSplit fSplit = new FileSplit(testPath,1,2,hosts);
WikipediaInputSplit split = new WikipediaInputSplit(fSplit,7);
FileSplit fSplit = new FileSplit(testPath, 1, 2, hosts);
WikipediaInputSplit split = new WikipediaInputSplit(fSplit, 7);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
split.write(out);
out.close();
out.close();
baos.close();

ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInput in = new ObjectInputStream(bais);

Expand All @@ -45,12 +59,11 @@ public void testSerialization() throws IOException
Assert.assertTrue(fSplit.getPath().equals(fSplit2.getPath()));
Assert.assertTrue(fSplit.getStart() == fSplit2.getStart());
Assert.assertTrue(fSplit.getLength() == fSplit2.getLength());

String [] hosts2 = fSplit2.getLocations();
String[] hosts2 = fSplit2.getLocations();
Assert.assertEquals(hosts.length, hosts2.length);
for(int i = 0; i < hosts.length; i++)
{
Assert.assertEquals(hosts[i],hosts2[i]);
for (int i = 0; i < hosts.length; i++) {
Assert.assertEquals(hosts[i], hosts2[i]);
}
}
}

0 comments on commit 66bb45c

Please sign in to comment.