Monday, July 1, 2013

Cassandra: Using compound keys with SSTableSimpleUnsortedWriter and sstableloader

I started using Cassandra 1.2.5. I created a keyspace and a table with a compound key using CQL3.
   create keyspace test_keyspace with replication = {'class': 'SimpleStrategy', 'replication_factor':1};
   create table test_table ( k1 bigint, k2 bigint, created timestamp, PRIMARY KEY (k1, k2) ) with compaction = { 'class' : 'LeveledCompactionStrategy' };
My next task was to popuulate the table with a lot of data. I used sstableloader for the task, which uses input created via SSTableSimpleUnsortedWriter. The code sample uses a simple key, not a compound key. I looked at the classes in the org.apache.cassandra.db.marshall package and found CompositeType, which looks like what I should be using. Intuitively I thought that since my key is a compund key then the row key is a CompositeType and that the rest works as in the simple example, so I tried using the following code:
   List<AbstractType<?>> compositeList = new ArrayList<AbstractType<?>>();
   compositeList.add( LongType.instance );
   compositeList.add( LongType.instance );
   CompositeType compositeType = CompositeType.getInstance( compositeList );
   SSTableSimpleUnsortedWriter sstableWriter = new SSTableSimpleUnsortedWriter(
      new File( System.getProperty( "output" ) ),
      new Murmur3Partitioner(),
      "test_keyspace",
      "test_table",
      compositeType,
      null,
      64 );
   long timestamp = System.currentTimeMillis();
   long nanotimestamp = timestamp * 1000;
   long k1 = 5L;
   long k2 = 10L;
   sstableWriter.newRow( compositeType.builder().add( bytes( k1 ) ).add( bytes( k2 ) ).build() );
   sstableWriter.addColumn( bytes( "created" ), bytes( timestamp ), nanotimestamp );
   sstableWriter.close();
I then loaded the sstable files to Cassandra using the command "sstableloader -v -debug test_keyspace/test_table/" The command ends without any indication of a problem, but the table remains empty. I went over the node log file and saw this cryptic exception:
java.lang.RuntimeException: java.lang.IllegalArgumentException
        at org.apache.cassandra.service.RangeSliceVerbHandler.doVerb(RangeSliceVerbHandler.java:64)
        at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:56)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.IllegalArgumentException
        at java.nio.Buffer.limit(Buffer.java:247)
        at org.apache.cassandra.db.marshal.AbstractCompositeType.getBytes(AbstractCompositeType.java:51)
        at org.apache.cassandra.db.marshal.AbstractCompositeType.getWithShortLength(AbstractCompositeType.java:60)
        at org.apache.cassandra.db.marshal.AbstractCompositeType.split(AbstractCompositeType.java:126)
        at org.apache.cassandra.db.filter.ColumnCounter$GroupByPrefix.count(ColumnCounter.java:96)
        at org.apache.cassandra.db.filter.SliceQueryFilter.collectReducedColumns(SliceQueryFilter.java:164)
        at org.apache.cassandra.db.filter.QueryFilter.collateColumns(QueryFilter.java:136)
        at org.apache.cassandra.db.filter.QueryFilter.collateOnDiskAtom(QueryFilter.java:84)
        at org.apache.cassandra.db.RowIteratorFactory$2.getReduced(RowIteratorFactory.java:106)
        at org.apache.cassandra.db.RowIteratorFactory$2.getReduced(RowIteratorFactory.java:79)
        at org.apache.cassandra.utils.MergeIterator$ManyToOne.consume(MergeIterator.java:114)
        at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:97)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.cassandra.db.ColumnFamilyStore$3.computeNext(ColumnFamilyStore.java:1399)
        at org.apache.cassandra.db.ColumnFamilyStore$3.computeNext(ColumnFamilyStore.java:1395)
        at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at org.apache.cassandra.db.ColumnFamilyStore.filter(ColumnFamilyStore.java:1466)
        at org.apache.cassandra.db.ColumnFamilyStore.getRangeSlice(ColumnFamilyStore.java:1443)
        at org.apache.cassandra.service.RangeSliceVerbHandler.executeLocally(RangeSliceVerbHandler.java:46)
        at org.apache.cassandra.service.RangeSliceVerbHandler.doVerb(RangeSliceVerbHandler.java:58)
        ... 4 more
I sent a question to the Cassandra user mailing list and got a reply from Aaron Morton which pointed me in the right direction (http://thelastpickle.com/2013/01/11/primary-keys-in-cql) I inserted a row manually and I used cassandra-cli to see what the data looks like:
RowKey: 5
=> (column=10:created, value=0000013f84be6288, timestamp=1372321637000000)
From this example see that the row key is a single Long value "5", and it has one composite column "10:created" with a timestamp value. Thus the code should look like this:
   List<AbstractType<?>> compositeList = new ArrayList<AbstractType<?>>();
   compositeList.add( LongType.instance );
   compositeList.add( LongType.instance );
   CompositeType compositeType = CompositeType.getInstance( compositeList );
   SSTableSimpleUnsortedWriter sstableWriter = new SSTableSimpleUnsortedWriter(
      new File( System.getProperty( "output" ) ),
      new Murmur3Partitioner(),
      "test_keyspace",
      "test_table",
      compositeType,
      null,
      64 );
   long timestamp = System.currentTimeMillis();
   long nanotimestamp = timestamp * 1000;
   long k1 = 5L;
   long k2 = 10L;
   sstableWriter.newRow( bytes( k1 ) );
   sstableWriter.addColumn( compositeType.builder().add( bytes( k2 ) ).add( bytes( "created" ) ).build(), bytes( timestamp ), nanotimestamp );
   sstableWriter.close();