Migrating an Apache Table to MongoDB: Migrating the Cassandra Table to MongoDB

In this section we shall get the data stored in Cassandra database and migrate the data to MongoDB server. We shall use the MigrateCassandraToMongoDB class to migrate the data from Cassandra database to MongoDB server.

  1. Add a method called migrate() to the MigrateCassandraToMongoDB class and invoke the method from the main method.
  2. From the MigrateCassandraToMongoDB class connect to the Cassandra server as explained in the previous section in the main method.

cluster = Cluster.builder().addContactPoint(“127.0.0.1”).build();

session = cluster.connect();

A Session object is created to represent a connection with Cassandra server. We shall use the Session object to run a SELECT statement on Cassandra to select the data to be migrated.

  1. Run a SELECT statement as follows to select all rows from the catalog table in the datastax keyspace in the migrate() method.

ResultSet results = session.execute(“select * from datastax.catalog”);

  1. The result set of the query is represented with the ResultSet class. A row in the ResultSet is represented with the Row class. Iterate over the ResultSet to fetch each row as a Row object.

for (Row row : results) {

}

Before we migrate the rows of data fetched from Cassandra, create a Java client for MongoDB because we would need to connect to MongoDB and add the fetched data to MongoDB.

  1. The MongoClient class represents a client to MongoDB and provides internal connection pooling. We shall use the MongoClient(List<ServerAddress> seeds) constructor with the ServerAddress instance constructed from the host as localhost and the port on which the server is running as 27017. In the migrate() method create a MongoClient instance.

MongoClient mongoClient = new MongoClient(Arrays.asList(new ServerAddress( “localhost”, 27017)));

  1. A database instance is represented with the com.mongodb.client.

MongoDatabase class. Create a database object for the local database.

MongoDatabase db = mongoClient.getDatabase(“local”);

  1. A MongoDB database collection is represented with the com.mongodb.client. MongoCollection class. Next, create a MongoDB collection instance using the getCollection(String collectionName) method of the MongoDatabase object.

Create a collection called catalog.

MongoCollection<Document> coll = db.getCollection(“catalog”);

  1. A MongoDB collection gets created implicitly when a collection is referenced by name without having to first create the collection. Next, we shall migrate the result set obtained from Cassandra to MongoDB using a for loop to iterate over the rows in the result set.

for (Row row : results) {

}

  1. The ColumnDefinitions class represents the metadata describing the columns contained in a ResultSet. Obtain the column definitions as represented by a ColumnDefinitions object using the getColumnDefinitions() method of Row. Create an Iterator over the ColumnDefinitions object using the iterator() method with each column definition being represented with ColumnDefinitions.Definition.

ColumnDefinitions columnDefinitions = row.getColumnDefinitions();

Iterator<ColumnDefinitions.Definition> iter = columnDefinitions.iterator();

  1. Using a while loop and the hasNext method of Iterator iterate over the columns and obtain the column names using the getName method in ColumnDefinitions.

Definition.

while (iter.hasNext()) {

ColumnDefinitions.Definition column = iter.next();

String columnName = column.getName();

}

  1. Within the while loop, using the getString(String columnName) method in Row obtain the value corresponding to each column.

String columnValue = row.getString(columnName);

The org.bson.Document class represents a MongoDB document as a Map; key/value map that may be stored in Mongo database. The org.bson.Document implements the org.bson.conversions.Bson interface and represents a basic BSON object stored in MongoDB server.

  1. Next, we shall add a BSON document to the Mongo database. Within the for loop create a org.bson.Document instance using the class constructor.

Document catalog = new Document();

  1. Once a org.bson.Document has been created key/value pairs may be added using the append(String key, Object value) method in org.bson.Document class. Use the column name/value pairs obtained from Cassandra result set to create a complete BSON document using the append method.

catalog = catalog.append(columnName, columnValue);

  1. As discussed in Chapter 1 the MongoCollection class provides the insertOne(TDocument document) method to add a single document. Save the Document instance using the insertOne(TDocument document) method.

coll.insertOne(catalog);

  1. Next, output the document added to the MongoDB collection that also verifies that the document did get added. All the documents in a collection may be fetched using the find() method in MongoCollection. The find() method returns as result a FindIterable object.

FindIterable<Document> iterable = coll.find();

  1. As discussed in Chapter 1, output the key/value pairs for each document stored in the FindIterable object. Use an enhanced for loop to obtain the Document instances in the FindIterable. Obtain the key set associated with each Document instance using the keySet() method, which returns a Set<String> object. Create an Iterator<String> object from the Set object using iterator(). Use a while loop to iterate over the key set and output the document key for each Document and the associated Document object.

FindIterable<Document> iterable = coll.find();

String documentKey = null;

for (Document document : iterable) {

Set<String> keySet = document.keySet();

Iterator<String> iter = keySet.iterator();

while (iter.hasNext()) {

documentKey = iter.next();

System.out.println(documentKey);

System.out.println(document.get(documentKey));

}

}

  1. Close the MongoClient object using the close() method. mongoClient.close();
  2. Also shut down the Cassandra session and cluster.

session.close();

cluster.close();

The MigrateCassandraToMongoDB class is listed:

package mongodb;

import java.util.Arrays;

import java.util.Iterator;

import java.util.Set;

import org.bson.Document;

import com.datastax.driver.core.Cluster;

import com.datastax.driver.core.ColumnDefinitions;

import com.datastax.driver.core.ResultSet;

import com.datastax.driver.core.Row;

import com.datastax.driver.core.Session;

import com.mongodb.MongoClient;

import com.mongodb.ServerAddress;

import com.mongodb.client.FindIterable;

import com.mongodb.client.MongoCollection;

import com.mongodb.client.MongoDatabase;

public class MigrateCassandraToMongoDB {

private static Cluster cluster;

private static Session session;

private static MongoClient mongoClient;

public static void main(String[] args) {

cluster = Cluster.builder().addContactPoint(“127.0.0.1”).build();

session = cluster.connect();

session = cluster.connect();

migrate();

mongoClient.close();

session.close();

cluster.close();

}

public static void migrate() {

mongoClient = new MongoClient(Arrays.asList(new ServerAddress( “localhost”, 27017)));

MongoDatabase db = mongoClient.getDatabase(“local”);

MongoCollection<Document> coll = db.getCollection(“catalog”);

ResultSet results = session.execute(“select * from datastax.catalog”);

for (Row row : results) {

ColumnDefinitions columnDefinitions = row.getColumnDefinitions();

Iterator<ColumnDefinitions.Definition> iter = columnDefinitions .iterator();

Document catalog = new Document(); while (iter.hasNext()) {

ColumnDefinitions.Definition column = iter.next();

String columnName = column.getName();

String columnValue = row.getString(columnName);

catalog = catalog.append(columnName, columnValue);

}

coll.insertOne(catalog);

}

FindIterable<Document> iterable = coll.find();

String documentKey = null;

for (Document document : iterable) {

Set<String> keySet = document.keySet();

Iterator<String> iter = keySet.iterator();

while (iter.hasNext()) {

documentKey = iter.next();

System.out.println(documentKey);

System.out.println(document.get(documentKey));

}

}

}

}

  1. To migrate the Cassandra table to MongoDB, right-click on

MigrateCassandraToMongoDB class in Package Explorer and select Run As Java Application as shown in Figure 6-16.

The Apache Cassandra table gets migrated to MongoDB. The table data migrated to MongoDB is also output in the Eclipse console as shown in Figure 6-17.

A more detailed output from the application is as shown below.

16:31:48.058 [main] INFO com.datastax.driver.core.Cluster – New Cassandra host /127.0.0.1:9042 added

16:31:48.143 [cluster1-nio-worker-1] DEBUG com.datastax.driver.core.Connection – Connection[/127.0.0.1:9042-2, inFlight=0, closed=false] Connection opened successfully

16:31:48.168 [cluster1-nio-worker-1] DEBUG com.datastax.driver.core.Session – Added connection pool for /127.0.0.1:9042

16:31:48.176 [cluster1-nio-worker-2] DEBUG com.datastax.driver.core.Connection – Connection[/127.0.0.1:9042-3, inFlight=0, closed=false] Connection opened successfully

16:31:48.190 [cluster1-nio-worker-2] DEBUG com.datastax.driver.core.Session – Added connection pool for /127.0.0.1:9042

16:31:49.122 [main] INFO org.mongodb.driver.cluster – Cluster created with settings {hosts=[localho st:27017], mode=MULTIPLE, requiredClusterType=UNKNOWN, serverSelectionTimeout=’30000 ms’, maxWaitQue ueSize=500}

16:31:49.122 [main] INFO org.mongodb.driver.cluster – Adding discovered server localhost:27017 to c lient view of cluster

16:31:49.165 [main] DEBUG org.mongodb.driver.cluster – Updating cluster description to {type=UNKNOW N, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]

16:31:49.253 [cluster-ClusterId{value=’55c93465cd73501ae89a80b9′, description=’null’}- localhost:27017] INFO org.mongodb.driver.connection – Opened connection [connectionId{localValue:1, serverValue:16}] to localhost:27017

16:31:49.254 [cluster-ClusterId{value=’55c93465cd73501ae89a80b9′, description=’null’}- localhost:27017] DEBUG org.mongodb.driver.cluster – Checking status of localhost:27017

16:31:49.256 [cluster-ClusterId{value=’55c93465cd73501ae89a80b9′, description=’null’}- localhost:27017] INFO org.mongodb.driver.cluster – Monitor thread successfully connected to server with descripti on ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=Ser verVersion{versionList=[3, 0, 5]}, minWireVersion=0, maxWireVersion=3, electionId=null, maxDocumentS ize=16777216, roundTripTimeNanos=2118747}

16:31:49.258 [cluster-ClusterId{value=’55c93465cd73501ae89a80b9′, description=’null’}- localhost:27017] INFO org.mongodb.driver.cluster – Discovered cluster type of STANDALONE

16:31:49.259 [cluster-ClusterId{value=’55c93465cd73501ae89a80b9′, description=’null’}- localhost:27017] DEBUG org.mongodb.driver.cluster – Updating cluster description to {type=STANDALONE, servers=[{address=localhost:27017, type=STANDALONE, roundTripTime=2.1 ms, state=CONNECTED}]

16:31:49.281 [main] INFO org.mongodb.driver.connection – Opened connection [connectionId{localValue:2, serverValue:17}] to localhost:27017

16:31:49.307 [main] DEBUG org.mongodb.driver.protocol.insert – Inserting 1 documents into namespace local.catalog on connection [connectionId{localValue:2, serverValue:17}] to server localhost:27017

16:31:49.318 [main] DEBUG org.mongodb.driver.protocol.insert – Insert completed

16:31:49.319 [main] DEBUG org.mongodb.driver.protocol.insert – Inserting 1 documents into namespace local.catalog on connection [connectionId{localValue:2, serverValue:17}] to server localhost:27017

16:31:49.320 [main] DEBUG org.mongodb.driver.protocol.insert – Insert completed

16:31:49.354 [main] DEBUG org.mongodb.driver.protocol.query – Sending query of namespace local.catal og on connection [connectionId{localValue:2, serverValue:17}] to server localhost:27017

16:31:49.358 [main] DEBUG org.mongodb.driver.protocol.query – Query completed_id

55c93465cd73501ae89a80ba

catalog_id

catalog1

journal

Oracle Magazine author

David A. Kelly edition

November-December 2013

publisher

Oracle Publishing

title

Engineering as a Service _id

55c93465cd73501ae89a80bb

catalog_id

catalog2

journal

Oracle Magazine

author

Tom Haunert edition

November-December 2013

publisher

Oracle Publishing

title

Quintessential and Collaborative

16:31:49.364 [main] INFO org.mongodb.driver.connection – Closed connection [connectionId{localValue:2, serverValue:17}] to localhost:27017 because the pool has been closed.

16:31:49.365 [main] DEBUG org.mongodb.driver.connection – Closing connection connectionId{localValue:2, serverValue:17}

16:31:49.368 [main] DEBUG com.datastax.driver.core.Connection – Connection[/127.0.0.1:9042-3, inFlig ht=0, closed=true] closing connection

16:31:49.371 [cluster-ClusterId{value=’55c93465cd73501ae89a80b9′, description=’null’}- localhost:2701 7] DEBUG org.mongodb.driver.connection – Closing connection connectionId{localValue:1, serverValue:1 6}

16:31:49.381 [main] DEBUG com.datastax.driver.core.Cluster – Shutting down

16:31:49.382 [main] DEBUG com.datastax.driver.core.Connection – Connection[/127.0.0.1:9042-1, inFlig ht=0, closed=true] closing connection

16:31:49.382 [main] DEBUG com.datastax.driver.core.Connection – Connection[/127.0.0.1:9042-2, inFlig ht=0, closed=true] closing connection

  1. Run the following commands in mongo shell.

>use local >show collections >db.catalog.find()

The two documents migrated to MongoDB get listed as shown in Figure 6-18.

Source: Vohra Deepak (2015), Pro MongoDB™ Development, Apress; 1st ed. edition.

Leave a Reply

Your email address will not be published. Required fields are marked *