Thursday

Optimizing Spark Job Performance With Apache Ignite (Part 1)

Apache Ignite's memory-centric architecture enables efficient RDD sharing with IgniteContext and IgniteRDD to share RDDs between Spark apps. Come see how they work! 

Portions of this article were taken from my book, High-Performance In-Memory Computing With Apache Ignite. If this post got you interested, check out the rest of the book for more helpful information.

Apache Ignite offers several ways to improve a Spark job's performance: Ignite RDD, which represents an Ignite cache as a Spark RDD abstraction, and Ignite IGFS, an in-memory file system that can be transparently plugged into Spark deployments. Ignite RDD allows easily sharing states in-memory between different Spark jobs or applications. With Ignite in-memory shares RDDs, any Spark job can put some data into an Ignite cache that other Spark jobs can access later. Ignite RDD is implemented as a view over the Ignite distributed cache, which can be deployed either within the Spark job execution process or on a Spark worker.

Before we move on to more advanced topics, let's have a look at the history of Spark and what kinds of problems can be solved by Ignite RDDs.
Apache Spark was invented by AMPLab for fast computation. It was built on top of Hadoop MapReduce and extends the MapReduce model to efficiently use more types of operations, such as interactive queries and stream processing.
The main difference between Spark and Hadoop MapReduce is that during execution, Spark tries to keep data in memory, whereas Hadoop MapReduce shuffles data into and out of disk. Hadoop MapReduce takes significant time to write intermediate data to disk and read it back. The elimination of these redundant disk operations makes Spark magnitudes faster. Spark can store data (intermediately) into memory without any I/O, so you can keep operating on the same data very quickly.
In order to store data into memory, Spark provides special dataset named Spark RDD. Spark RDD stands for Spark Resilient Distributed Dataset. Spark RDD has fundamental components of the Apache Spark large-scale data processing framework. The following illustration shows iterative operations on Spark RDD.
Note that the above figure is obtained from the Spark documentation. Spark RDD is an immutable, fault-tolerant distributed collection of data elements. You can imagine Spark RDD as a Hadoop HDFS in memory. Spark RDD supports two types of operations:
  1. Transformations, which create a new dataset from existing one
  2. Actions, which returns a value by performing a computation on the RDD (as shown in the next figure)
Spark RDD is created through the use of Spark transformation functions. Spark transformation functions can create Spark RDDs from various sources, such as text files. In addition to creating Spark RDDs from the text files, Spark RDDs may be created from external storage such as RDBMS, HBase, Cassandra, or any other data source compatible with Hadoop input format.
Most of the time, Spark RDDs are transformed from one RDD to another new Spark RDD in order to prepare the dataset for future processing. Let's consider the following data transformations steps in Spark:
  1. Load a text file with airline names and arrival times for any airport in RDD1.
  2. Load a text file with airline names and flight delay information for any airport into RDD2.
  3. Join RDD1 and RDD2 by airline names to get RDD3.
  4. Map on RDD3 to get a nice report for each airline as RDD4.
  5. Save RDD4 to file.
  6. Map RDD2 to extract the information of flight delay for certain airlines to get RDD5.
  7. Aggregate the RDD5 to get a count of how many flights are delayed for each airline as RDD6.
  8. Save the RDD6 into HDFS.
Spark RDDs are utilized to perform computations on an RDD dataset through Spark actions such as count or reduce. But there is a single problem with the Spark RDD: Spark RDD can't share between Spark Jobs or SparkContext because Spark RDD is bound to a Spark application. With native Spark distribution, the only way to share RDDs between different Spark jobs is to write the dataset into HDFS or somewhere in the file system and then pull the RDDs within the other jobs. However, the same functionality can be achieved by using Alluxio (formerly Tachyon) or Apache Ignite.
Apache Ignite's memory-centric architecture enables RDD sharing in a very efficient and effective way. Apache Ignite provides IgniteContext and IgniteRDD to share RDDs between Spark applications.
  1. IgniteContext: IgniteContext is the main entry point to the Spark-Ignite integration. To create an instance of an Ignite context, a user must provide an instance of SparkContext and a closure creating IgniteConfiguration (configuration factory). Ignite context will make sure that server or client Ignite nodes exist in all involved job instances. Alternatively, a path to an XML configuration file can be passed to IgniteContext constructor, which will be used to nodes being started.
  2. IgniteRDD: IgniteRDD is an implementation of Spark RDD abstraction representing a live view of Ignite cache. IgniteRDD is not immutable; all changes in the Ignite cache (regardless of whether they were caused by another RDD or by external changes in cache) will be visible to RDD users immediately. IgniteRDD utilizes the partitioned nature of Ignite caches and provides partitioning information to Spark executor. A number of partitions in IgniteRDD equals the number of partitions in the underlying Ignite cache. IgniteRDD also provides affinity information to Spark via getPrefferredLocations so that RDD computations use data locality.
In the next part of this series, we are going to install Apache Spark and do the following:
  1. Run the wordcount example to verify the Spark installation.
  2. Configure Apache Ignite to share RDDs between Spark applications.
  3. Run Spark applications through Spark Shell to use Ignite RDD.
  4. Develop a Scala Spark application to put some Ignite RDD into the Ignite cluster and pull them from another Scala Spark application.

Saturday

Book review: High Performance in-memory computing with Apache Ignite by Sadruddin Md

Read the full book review by Sadruddin Md.

Sunday

The Apache Ignite Native persistence, a brief overview

In-memory approaches can achieve blazing speed by putting the working set of the data into the system memory. When all data is kept in memory, the need to deal with issues arising from the use of traditional spinning disks disappears. This means, for instance, there is no need to maintain additional cache copies of data and manage synchronization between them. But there is also a downside to this approach because the data is in memory only, it will not survive if the whole cluster gets terminated. Therefore, this types of data stores are not considered persistence at all.
In this blog post, I will do an effort to explore the Apache Ignite new native persistence feature and provide a clear, understandable picture how the Apache Ignite native persistence works. 
In most cases, you can’t (should not) store the whole data set in memory for your application, most often you should store relatively small hot or active subset of data to increase the performance of the application. The rest of the data should be stored somewhere in low-cost disks or tape for archiving. There are two main in-memory database storage requirements available:
  • Permanent media, to store committed transactions, thereby maintaining durability and for recovery purpose if the in-memory database needs to be reloaded into the memory.
  • Permanent storage, to hold a backup copy of the entire in-memory database.
Permanent storage or media can be any distributed or local file system, SAN, NoSQL database or even RDBMS like Postgres or Oracle. Apache Ignite (since 1.5) provides an elegant way to connect persistence data stores such as RDBMS or NoSQL DB like Mongo DB or Cassandra. Most often persistence in an RDBMS will be bottlenecks and you never got a horizontal scaling in your system. For more information, I recommended you to have a look at the sample chapter of the book "High performance in-memory computing with Apache Ignite".


So, from the version 2.1.0, Apache Ignite provides ACID and SQL-compliant disk store that transparently integrates with Ignite's durable memory as an optional disk layer storing data and indexes on SSD, Flash, 3D XPoint, and other types of non-volatile storages.
The Apache Ignite native persistence uses new durable memory architecture that allows storing and processing data and indexes both in-memory and on disk. Whenever the feature enables, Apache Ignite stores a superset of data on disk, and a subset of data in RAM based on its capacity. If a subset of data or an index is missing in RAM, the Durable Memory will take it from the disk as shown new pictures below.
Data can be also stored in the central disk storage where all the Ignite nodes connected as shown below.

Before we start, let's cover the prerequisites of the project in our sandbox:
  1. Apache Ignite version 2.1.0
  2. JVM 1.8
  3. Apache Maven version >3.0.3
  4. *nix based operating system
Installation.
There are basically two ways to use Apache Ignite:
  • Download the binary distribution and unzip the archive somewhere in your os and run the ./ignite.sh bash script with the spring config files.
  • Create a maven project with the required Apache Ignite dependencies, configure the node through the java code and run it.
Here, I am going to use the first option.
Step 1.
  • Download the Apache Ignite binary distribution and unzip the distribution somewhere in your sandbox. 
  • Modify the IGNITE_HOME/examples/config/persistentstore/example-persistent-store.xml file and comment the following part of the cache configuration.
<property name="cacheConfiguration">
<list>
    <bean class="org.apache.ignite.configuration.CacheConfiguration">
        <property name="name" value="testCache"/>
            <property name="backups" value="1"/>
            <property name="atomicityMode" value="TRANSACTIONAL"/>
            <property name="writeSynchronizationMode" value="FULL_SYNC"/>
            <property name="indexedTypes">
            <list>
            <value>java.lang.Long</value>
                <value>org.apache.ignite.examples.model.Organization</value>
            </list>
            </property>
        </bean>
    </list>
</property>
Note that, to enable the Ignite native persistence, you only need to pass the following configuration (an instance of the PersistentStoreConfiguration), which already pre-configured in the example-persistent-store.XML file.


<property name="persistentStoreConfiguration">
<bean class="org.apache.ignite.configuration.PersistentStoreConfiguration"/>
</property>
  • Run the following command from the IGNITE_HOME directory.
./ignite.sh $IGNITE_HOME/examples/config/persistentstore/example-persistent-store.xml
Step 2.
  • create a Maven project with the following command.
mvn archetype:create -DgroupId=com.blu.imdg -DartifactId=ignite-persistence
  • Add the following dependencies in the pom.xml
<dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-core</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-spring</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-indexing</artifactId>
      <version>2.1.0</version>
    </dependency>
  • Create a Java class with the following contents.
public class HelloWorld {
    public static void main(String[] args) {
        System.out.println("Hello Ignite");
        // create a new instance of TCP Discovery SPI
        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        // create a new instance of tcp discovery multicast ip finder
        TcpDiscoveryMulticastIpFinder tcMp = new TcpDiscoveryMulticastIpFinder();
        tcMp.setAddresses(Arrays.asList("localhost")); // change your IP address here
        // set the multi cast ip finder for spi
        spi.setIpFinder(tcMp);
        // create new ignite configuration
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(true);
        // set the discovery§ spi to ignite configuration
        cfg.setDiscoverySpi(spi);
        // Start ignite
        Ignite ignite = Ignition.start(cfg);
        ignite.active(true);
        // get or create cache
        IgniteCache cache = ignite.getOrCreateCache("testCache");
        // put some cache elements
        for (int i = 1; i <= 100; i++) {
            cache.put(i, Integer.toString(i));
        }
        // get them from the cache and write to the console
        for (int i = 1; i <= 100; i++) {
            System.out.println("Cache get:" + cache.get(i));
        }
        ignite.close();

    }
}
Note that, we are using Ignite client mode for manipulating data. After running the HelloWorld application 100 elements should be inserted in the cache (cache name testCache).
Step 3.
  • let's use the ignitevisor command tool to examine the data. use cache -scan command in ignitevisor command tool. You should get the similar illustration in your console. All 100 elements in the cache.
    • Now, let's see whats happened under the hood. Run the following command from the IGNITE_HOME/work directory
du -h .
You should get something like in your console as shown below.
If Apache Ignite native persistence enables, Ignite will persist all the data and the index in memory and on disk across all the cluster nodes. 
If you will go through the directory db/0_0_0_0_0_0_0_1_10_211_55_2_10_37_129_2_127_0_0_1_192_168_1_37_47500 (in my case), you will find individual folder for every cache. The folder with name cache-testCache will contain all the cache entries (100 elements) which we have just inserted.
The file index.bin is the index of the cache entries and every cache element gets their individual page file. Why did this happen? now Ignite architecture is page based architecture. Let's take a closer look, memory now splits into regions -> regions split into segments -> segments split into pages. Pages can be swapped into the disk. Pages can store:
  • data 
  • metadata
  • index
Page are fixed-length block, it also supports automatic defragmentation. If you take a closer look at the pages size, all of them are 14 KB. Whenever Ignite needs to load data from the disk, it just loads the page file and so it's very fast.
Also, there is another concept over write-ahead log (WAL). If you doing an update, first it will be updating the data in-memory and marks the page dirty, and then it will persist the data into the write-ahead log. Ignite just append the update into the WAL file. WAL file is very much similar to Cassandra commitlog file, with one difference. Cassandra writes parallel into in-memory and the commitlog file on disk, on the other hand, Ignite update the data into the memory first and then append the data into the WAL. For more information, I recommend you to have a look at the documentation, which is quite exhaustive.
Step 4.
  • Restart the Ignite node, and check the cache testCache with ignitevisor. You will end up with a surprise that no data into the cache.
  • Let's slightly modify our helloworld class and run the application again, comment or delete the following fragments of the code as shown below.
// put some cache elements
for (int i = 1; i <= 100; i++) {
  cache.put(i, Integer.toString(i));
}
Run the application and check the cache testCache through ignitevisor and you application console. 
Whenever any read request occurs, Ignite first check the data into the memory. If the dataset doesn't exist in memory, Ignite immediately load the cache entries from the disk and load into the memory. Also note that, all entries into the memory in offheap. 
Benefits.
With Ignite native persistence, now you can easily do backup for the data recovery, Denis Magda writes a comprehensive article for data recovery by using Ignite native persistence. One thing I have to mention here is the data replication between clusters. By using Ignite native persistence, now you can replicate data from one cluster to another on line. You can use any standard disk based data replication tools to copy the changed data set from the primary data center to the stand-in data center or Ignite cluster.

Apache Ignite with Spring Data

Spring Data provides a unified and easy way to access the different kinds of persistence store, both relational database systems, and NoSQL data stores. It is on top of JPA, adding another layer of abstraction and defining a standard-based design to support persistence Layer in a Spring context.
Apache Ignite IgniteRepository implements Spring Data CrudRepository interface and extends basic capabilities of the CrudRepository, which in turns supports:
  1. Basic CRUD operations on a repository for a specific type.
  2. Access to the Apache Ignite SQL grid via Spring Data API.
With Spring Data's repositories, you only need to write an interface with finder methods to query the objects. All the CRUD method for manipulating the objects will be delivered automatically. As an example:


@RepositoryConfig(cacheName = "DogCache")
public interface DogRepository extends IgniteRepository<Dog, Long> {
    List<Dog> getDogByName(String name);
    Dog getDogById (Long id);
}
In this article, we are going to cover the following topics:
  • Create a Maven project from the scratch for using Spring Data with Apache Ignite Grid.
  • Persisting a few entities into Ignite caches through Spring Data framework.
Before we start, let's cover the prerequisites of the project in your sandbox:
  1. Java JDK 1.8
  2. Ignite version2.0
  3. Apache Maven version >3.0.3
Step 1
Let’s set up the sandbox first. Create a Maven project or Clone the project from the GitHub repository.
mvn archetype:create -DgroupId=com.blu.imdg -DartifactId=spring-data

Step 2

Modify the pom.xml, add the following maven dependencies:
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-core</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-spring</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-spring-data</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-indexing</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>1.4.195</version>
</dependency>
Note that, maven h2 dependency is optional. If you are getting an error like "org.h2.result.RowFactory", add the dependency explicitly.

The Domain Model

Our example domain model consisted of two different entities: Breed and Dog.

The association between Breed and Dog is ManyToOne. One Dog can have only one breed.

Step 3

Now, let’s map the domain model by creating the Java classes and annotating them with the required meta-information. Let’s start with the Breed class.
package com.blu.imdg.model;

import org.apache.ignite.cache.query.annotations.QuerySqlField;

import java.io.Serializable;

public class Breed implements Serializable {

    @QuerySqlField(index = true)
    private Long id;

    @QuerySqlField(index = true)
    private String name;

    public Long getId() {

        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Breed{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                '}';
    }
}
Note that, @QuerySqlField annotation enables the fields for SQL queries.
Create another class named Dog and add the following contents to it.

package com.blu.imdg.model;

import org.apache.ignite.cache.query.annotations.QuerySqlField;

import java.io.Serializable;
import java.sql.Date;

public class Dog implements Serializable {

    @QuerySqlField(index = true)
    private Long id;
    @QuerySqlField(index = true)
    private String name;
    @QuerySqlField(index = true)
    private Long breedid;
    @QuerySqlField(index = true)
    private Date birthdate;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Long getBreedid() {
        return breedid;
    }

    public void setBreedid(Long breedid) {
        this.breedid = breedid;
    }

    public Date getBirthdate() {
        return birthdate;
    }

    public void setBirthdate(Date birthdate) {
        this.birthdate = birthdate;
    }

    @Override
    public String toString() {
        return "Dog{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", breedid=" + breedid +
                ", birthdate=" + birthdate +
                '}';
    }
}

Step 4

Now, lets create the Spring repository for all the pojo's created before.
package com.blu.imdg.repositories;

import com.blu.imdg.model.Dog;
import org.apache.ignite.springdata.repository.IgniteRepository;
import org.apache.ignite.springdata.repository.config.RepositoryConfig;

import java.util.List;

@RepositoryConfig(cacheName = "DogCache")
public interface DogRepository extends IgniteRepository<Dog, Long> {
    List<Dog> getDogByName(String name);
    Dog getDogById (Long id);
}
@RepositoryConfig annotation should be specified to map a repository to a distributed cache. Also, we have two finder methods getDogByName and getDogById for querying the cache.
Lets' add a similar repository for the Breed domain as follows:
package com.blu.imdg.repositories;

import com.blu.imdg.model.Breed;
import org.apache.ignite.springdata.repository.IgniteRepository;
import org.apache.ignite.springdata.repository.config.Query;
import org.apache.ignite.springdata.repository.config.RepositoryConfig;
import org.springframework.data.domain.Pageable;

import java.util.List;

@RepositoryConfig(cacheName = "BreedCache")
public interface BreedRepository extends IgniteRepository<Breed, Long> {

    List<Breed> getAllBreedsByName (String name);

    @Query("SELECT id FROM Breed WHERE id = ?")
    List<Long> getById (long id, Pageable pageable);
}

In the above BreedRepository interface, we also use @Query(queryString) annotation, which can be used if a concrete SQL query needs to be executed as a result of a method call.
Step 5
Let’s create the cache configuration class. Create an Ignite cache configuration class and mark the application configuration with @EnableIgniteRepositories annotation, as shown below:
package com.blu.imdg.repositories;

import com.blu.imdg.model.Breed;
import com.blu.imdg.model.Dog;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.springdata.repository.config.EnableIgniteRepositories;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableIgniteRepositories
public class SpringAppConfig {
    @Bean
    public Ignite igniteInstance() {
        IgniteConfiguration cfg = new IgniteConfiguration();
        // Setting some custom name for the node.
        cfg.setIgniteInstanceName("springDataNode");
        // Enabling peer-class loading feature.
        cfg.setPeerClassLoadingEnabled(true);
        // Defining and creating a new cache to be used by Ignite Spring Data
        // repository.
        CacheConfiguration ccfgDog = new CacheConfiguration("DogCache");
        CacheConfiguration ccfgBreed = new CacheConfiguration("BreedCache");
        // Setting SQL schema for the cache.
        ccfgBreed.setIndexedTypes(Long.class, Breed.class);
        ccfgDog.setIndexedTypes(Long.class, Dog.class);

        cfg.setCacheConfiguration(new CacheConfiguration[]{ccfgDog, ccfgBreed});

        return Ignition.start(cfg);
    }
}
Note that, we have used two separate CacheConfiguration for Breed and Dog cache. Also, set the SQL schema for the cache.

Step 6

Once all the configurations and the repositories are ready to be used, we only need to register the configuration in a Spring application context.
package com.blu.imdg;

import com.blu.imdg.model.Breed;
import com.blu.imdg.model.Dog;
import com.blu.imdg.repositories.BreedRepository;
import com.blu.imdg.repositories.DogRepository;
import com.blu.imdg.repositories.SpringAppConfig;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.sql.Date;
import java.util.List;

/**
 * Hello world!
 *
 */
public class App 
{
    private static AnnotationConfigApplicationContext ctx;
    private static BreedRepository breedRepository;
    private static DogRepository dogRepository;

    public static void main( String[] args )
    {
        System.out.println( "Spring Data Example!" );
        ctx = new AnnotationConfigApplicationContext();
        ctx.register(SpringAppConfig.class);
        ctx.refresh();

        breedRepository = ctx.getBean(BreedRepository.class);
        dogRepository = ctx.getBean(DogRepository.class);

        //fill the repository with data and Save
        Breed collie = new Breed();
        collie.setId(1L);
        collie.setName("collie");
        //save Breed with name collie
        breedRepository.save(1L, collie);

        System.out.println("Add one breed in the repository!");
        // Query the breed
        List<Breed> getAllBreeds = breedRepository.getAllBreedsByName("collie");

        for(Breed breed : getAllBreeds){
            System.out.println("Breed:" + breed);
        }
        //Add some dogs
        Dog dina = new Dog();
        dina.setName("dina");
        dina.setId(1L);
        dina.setBreedid(1L);
        dina.setBirthdate(new Date(System.currentTimeMillis()));
        //Save Dina
        dogRepository.save(2L,dina);
        System.out.println("Dog dina save into the cache!");
        //Query the Dog Dina
        List<Dog> dogs = dogRepository.getDogByName("dina");
        for(Dog dog : dogs){
            System.out.println("Dog:"+ dog);
        }

    }
}
The above code snippet is very straight forward. First, we create a Spring annotated context and register our repositories. Next, we get the reference to our BreedRepository and DogRepository to insert a few data. To query the data we use basic CRUD operations or methods that will be automatically turned into Apache Ignite SQL queries:


List<Dog> dogs = dogRepository.getDogByName("dina");
for(Dog dog : dogs){
  System.out.println("Dog:"+ dog);
}

Step 7

Let’s build and run the application. Execute the following command.
mvn clean install
mvn exec:java -Dexec.mainClass=com.blu.imdg.App

You should find a lot of log messages into the console.


The log messages confirm that two entries (dina and breed-collie) have been flushed into the Ignite cache and retrieved the dog Dina from the cache. Let’s explore the cache through Ignite Visor.

Two different caches have been created for the entities: Breed and Dog. If we scan the cache entries of the Dog cache, we should find the following entity on it.

Entity Dina has been persisted into the cache with the key of the Breed collie.
If you want to learn more about Apache Ignite (using JPA, Hibernate or MyBatis), please refer the book High Performance in-memory computing with Apache Ignite.

Saturday

In-Memory MapReduce and Your Hadoop Ecosystem (Part 2)

Portions of this article were taken from the book High-Performance In-Memory Computing With Apache Ignite. If it got you interested, check out the rest of the book for more helpful information.
Before reading, be sure to check out Part 1!
Apache Ignite provides a vanilla distributed in-memory file system called Ignite File System (IGFS) with similar functionality to Hadoop HDFS. This is one of the unique features of Apache Ignite that helps accelerate Big Data computing. IGFS implements the Hadoop file system API and is designed to support Hadoop v1 and Yarn Hadoop v2. Ignite IGFS can transparently plug into Hadoop or Spark deployment.
One of the greatest benefits of the IGFS is that it does away with Hadoop NamedNode in the Hadoop deployment; it seamlessly utilizes Ignite’s in-memory database under the hood to provide completely automatic scaling and failover without any additional shared storage. IGFS uses memory instead of disk to produce a distributed, fault-tolerant, and high throughput file system. Removing NamedNode from the architecture leads to a dramatically better performance of I/O operations. Furthermore, IGFS provides native file system API to working with directories and files in the in-memory file system.
IgniteFileSystem, or the IGFS interface, provides methods for regular file system operations such as create, update, delete, mkdirs, etc., as well as MapReduce task executions. Another interesting feature of IGFS is its amazing smart usages of the file-level caching and eviction design. IGFS utilizes file-level caching to ensure corruption free storage.
Note that IGFS is not an alternative like RAM disk — it’s a fully compliant in-memory file system like HDFS. A high-level architecture of the IGFS is shown below in Figure 1.

In this article, we are going to cover basic operations of the IGFS and deploy the IGFS in standalone mode to store files into IGFS and performs a few MapReduce tasks on top of it.
Note: We are not going to replace the HDFS completely; otherwise, we would not be able to start the Hadoop dataNode anymore. We are going to use both IGFS and HDFS simultaneously.
From the bird’s eyes view, running MapReduce in IGFS on top of HDFS looks like as follows:
  1. Configure the IGFS for the Ignite nodes.
  2. Put files into IGFS.
  3. Configure the Hadoop.
  4. Run MapReduce.
There are a several ways to configure the IGFS on the Ignite cluster. Unfortunately, Apache Ignite doesn’t provide any comprehensive GUI-based management tools nor command line interface for maintaining Hadoop accelerator. However, GridGain Visor (Ignite commercial version) as a management tool provides IGFS monitoring and file management between HDFS, local and IGFS file systems. To demonstrate, how to use IGFS, we will perform the following steps:
  1. Configure the IGFS file system in the Ignite cluster (default-config.xml).
  2. Run a standalone Java application to ingest a file into IGFS. In our case, the file will be the t8.shakespeare.txt.
  3. Configure Hadoop.
  4. Run a MapReduce wordcount job to compute the count of the words from the IGFS file.
  5. Run a standalone Java application to check the result of the MapReduce job.
Now that, we have dipped our toes into the IGFS, let’s configure the standalone IGFS and run some MapReduce jobs on it.

Step 1

Add the following springs configuration beans into the default-config.xml file of the Ignite node as follows:
<bean id="igfsCfgBase" class="org.apache.ignite.configuration.FileSystemConfiguration" abs\
tract="true">
<property name="blockSize" value="#{128 * 1024}"/>
<property name="perNodeBatchSize" value="512"/>
<property name="perNodeParallelBatchCount" value="16"/>
<property name="prefetchBlocks" value="32"/>
</bean>
<bean id="dataCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" a\
bstract="true">
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="backups" value="0"/>
<property name="affinityMapper">
<bean class="org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper">
<constructor-arg value="512"/>
</bean>
</property>
</bean>
<bean id="metaCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" a\
bstract="true">
<property name="cacheMode" value="REPLICATED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
</bean>
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
yVmIpFinder">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscover\
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="fileSystemConfiguration">
<list>
<bean class="org.apache.ignite.configuration.FileSystemConfiguration" parent\
="igfsCfgBase"></bean>
<property name="name" value="igfs"/>
<property name="metaCacheName" value="igfs-meta"/>
<property name="dataCacheName" value="igfs-data"/>
<property name="blockSize" value="1024"/>
<property name="streamBufferSize" value="1024"/>
<property name="ipcEndpointConfiguration">
<bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration">
<property name="type" value="SHMEM"/>
<property name="host" value="127.0.0.1"/>
<property name="port" value="10500"/>
</bean>
</property>

Next, we have configured base cache configuration called dataCacheCfgBase, which will be the parent of the IGFS data cache. Most of the properties of this configuration we have already discussed. Note that for demonstration purposes, we have set the backup value to 0.
Our subsequent configuration is the base configuration for the metadata cache called meta- CacheCfgBase. It is probably the most unfamiliar part of this configuration. IGFS contains metadata for all files ingested into the in-memory file system. The configuration of this property is very similar to the previous base cache configuration.
Next, we are going to configure the IGFS file system, it is the main part of the Ignite configuration. We set the name of the IGFS file system to IGFS. The block size and the stream buffer size of the IGFS file system will be 1024. To let IGFS accept requests from Hadoop, an endpoint should be configured. Ignite offers two endpoint types:
  1. shmem: Working over shared memory (not available on Windows).
  2. tcp: Working over standard socket API.

Step 2

When each Ignite node is configured (default-config.xml), start every node with the following commands:
$ignite.sh

Step 3

In this step, we are going to ingest our t8.shakespeare.txt file into the IGFS file system. As we described before, we will use a Java application to ingest the file into IGFS. The application is very simple; it ingests the t8.shakespeare.txt file once every time the application is launched. The application will take the name of the directory and the filename as an input parameter to put the files into IGFS. Open the pom.xml file and add the following code in the dependency section.
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-hadoop</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
Now, add a new Java class with the name IngestFileInIGFS. The full listing of the Java class is shown below:
public class IngestFileInIGFS {
 private final static Logger LOGGER = LoggerFactory.getLogger(IngestFileInIGFS.class);
 private final static String IGFS_FS_NAME = "igfs";
 public static void main(String...args) {
  if (args.length < 2) {
   LOGGER.error("Usages [java -jar chapter-bigdata-1.0-SNAPSHOT.jar DIRECTORY_NAM\
E FILE NAME, for example java -jar chapter-bigdata-1.0-SNAPSHOT.jar myDir myFile]");
   System.exit(0);
  }
  Ignite ignite = Ignition.start("default-config.xml");
  Ignition.setClientMode(true);
  Collection < IgniteFileSystem > fs = ignite.fileSystems();
  for (Iterator ite = fs.iterator(); ite.hasNext();) {
   IgniteFileSystem igniteFileSystem = (IgniteFileSystem) ite.next();
   LOGGER.info("IGFS File System name:" + igniteFileSystem.name());
  }
  IgniteFileSystem igfs = ignite.fileSystem(IGFS_FS_NAME); // Create directory.
  IgfsPath dir = new IgfsPath("/" + args[0]);
  igfs.mkdirs(dir);
  // Create file and write some data to it.
  IgfsPath file = new IgfsPath(dir, args[1]);
  // Read the File Shakespeare
  InputStream inputStream = IngestFileInIGFS.class.getClassLoader().getResourceAsStr\
  eam("t8.shakespeare.txt");
 }
}
byte[] filesToByte;
try {
 filesToByte = ByteStreams.toByteArray(inputStream);
 OutputStream out = igfs.create(file, true);
 out.write(filesToByte);
 out.close();
} catch (IOException e) {
 LOGGER.error(e.getMessage());
} finally {
 try {
  inputStream.close();
 } catch (IOException e) {
  LOGGER.error(e.getMessage());
 }
}
LOGGER.info("Created file path:" + file.toString());
To compile and run the application, execute the following command:


mvn clean install
java -jar ./ IngestFileInIGFS.jar myDir myFile
After successfully compiling the Maven project, there will be Java executable JAR files in the target folder. The IngestFileInIGFS.jar file is for ingesting file into IGFS.

Step 4

It’s time for configuring Hadoop (the IGFS file system must be configured in Hadoop).
Let’s create a new directory under HADOOP_HOME/etc with the following command and copy all the files from the Hadoop directory. Execute the following command from the $HADOOP_HOME/etc directory.
cd $HADOOP_HOME/etc
$ mkdir hadoop-ignite
$ cp ./hadoop/*.* ./hadoop-ignite
Remove all the properties from the $HADOOP_HOME/etc/hadoop-ignite/core-site.xml and add the following properties as follows:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>igfs:///igfs@127.0.0.1:10500/</value>
</property>
<property>
<name>fs.igfs.impl</name>
<value>org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem</value>
</property>
</configuration>
The full qualified file system class name org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem is sufficient for configuring the IGFS for Hadoop.
Note: v1 or v2 doesn’t stand for Hadoop 1.x and Hadoop 2.x. Instead, this is about either old FileSystem API or new AbstractFileSystem API.
At this moment Hadoop configuration has been completed, and we are ready to execute Map/Reduce jobs.

Step 5

There are several ways to execute MapReduce jobs with Hadoop configuration. One of the easiest ways is to pass the Hadoop config directory as an input parameter to the job as follows:
hadoop --config [path_to_config] [arguments]

Let’s run our wordcount MapReduce job with the file from the IGFS with the following command:

time hadoop --config /home/user/hadoop/hadoop-2.7.2/etc/hadoop-ignite jar $HADOOP_HOME/sha\
re/hadoop/mapreduce/hadoop-mapredu
ce-examples-2.7.2.jar wordcount /myDir/myFile /myDir/out

After running the above statement, you should get the similar output in your terminal as shown below.


Note that you have to change the name of the output directory every time you run the MapReduce job.
Such a simple way, you can replace the Hadoop HDFS with IGFS.

Tuesday

An impatient start with Apache Ignite machine learning grid

Recently Apache Ignite 2.0 introduce a beta version of the in-memory machine learning grid, which is a distributed machine learning library built on top of the Apache IMDG. This beta release of ML library can perform local and distributed vector, decompositions and matrix algebra operations. The data structure can be stored in Java heap, off-heap or distributed Ignite caches. At this moment, the Apache Ignite ML grid doesn't support any prediction or recommendation analysis. In this short post, we are going to download the new Apache Ignite 2.0 release, build the example and run them.

1. Download and unpack the Apache Ignite 2.0 distribution.

Download the Apache Ignite 2.0 binary release version from the following link. Unpack the distribution somewhere in your workstation (e.g /home/ignite/2.0) and set the IGNITE_HOME path to the directory.

2. Start the Apache Ignite remote node

Run the following command in the terminal window.
ignite.sh examples/config/example-ignite.xml 

Note that, Remote nodes for examples should always be started with the special configuration file which enables P2P class loading: `examples/config/example-ignite.xml`.

Also, note that Apache Ignite version 2.0 needs Java version 1.8 or higher.

3. Build the machine learning examples

Go to the /examples folder of the Apache Ignite distribution. If you already installed and configure maven, run the following command from the examples folder.

mvn clean install -Pml

The above command will active the machine learning (ml) profile and build the project.

4. Run it

Lets run the simple local onheap version of the Vector example. Execute the following command in your terminal windows:

mvn exec:java -Dexec.mainClass=org.apache.ignite.examples.ml.math.vector.VectorExample

You should get the following logs in your console.


All the examples are autonomous, does't need any special configuration. Examples name with 'Cache' such as CacheMatrixExample or CacheVectorExample needs remote Ignite node with P2P class loading. Let's run the CacheMatrixExample with the following command.
mvn exec:java -Dexec.mainClass=org.apache.ignite.examples.ml.math.matrix.CacheMatrixExample

You should get the following output as shown below.


Additionally, Apache Ignite ML grid provides a simple utility class allows pretty-printing of matrices/vectors. You can run the TracerExample as follows:
mvn exec:java -Dexec.mainClass=org.apache.ignite.examples.ml.math.tracer.TracerExample

This above command will launch a web browser and provide some HTML output as follows:


This enough for now. If you like this post, you should also like the book.

Saturday

Unboxing of the first copy of the book High performance in-memory computing with Apache Ignite

Yesterday I have got the first paperback version of the book High performance in-memory computing with Apache Ignite 







The book is available at Lulu.com & Amazon bookstore.


Product details

  • Paperback: 360 pages
  • Language: English
  • ISBN-10: 1365732355
  • ISBN-13: 978-1365732355
  • Product Dimensions: 8.3 x 0.8 x 11 inches
  • Shipping Weight: 1.8 pounds

Happy reading!!


Support independent publishing: Buy this book on Lulu.