Submitting Storm and Trident Topologies Programmatically

Storm is a powerful distributed computing platform. In Storm a distributed computation is a directed graph of interconnected processors that exchange messages. This graph is a topology in Storm’s terms. Processors come in 2 flavors: they can either place new messages into the topology (spouts) or process messages received through their connections to other nodes (bolts). In Storm every message received by a bolt triggers a new event handler to process the message. Storm also provides a batch mode for message processing. The batch-processing flavor of Storm is called Trident.

In Storm and Trident topologies are configured through client-side API calls and then serialized and submitted to the Storm cluster coordinator (Nimbus) using Apache Thrift as the RPC layer. Storm provides a convenient CLI tool for submitting topologies to Nimbus. However there’s no documented approach to submitting topologies exclusively through the Storm API. Topology submission with the Storm API provides advantages over spawning an external process for each topology submission, as it is faster, less resource intensive and streamlines error handling.

Storm CLI

In the Storm CLI, a Storm or Trident topology can be submitted with a command similar to:

$ bin/storm jar {path-to-uber-jar} {topology-submission-main-class} {arg1} {arg2} ...

The bin/storm command (which is a Python script), reads the default configuration from defaults.yaml from the classpath and combines it with either the local Storm configuration settings from ~/.storm/storm.yaml or the configuration file provided in the CLI through the --config option. Then it spawns a new Java process with a few custom System properties such as the location of the submitted uber JAR. The main class is called which typically contains the following code:

StormTopology topology = buildTopology();
StormSubmitter.submitTopology("my-topology", conf, topology);

The Storm submitter uploads the JAR file referenced in the System property storm.jar to Nimbus and then it calls the Nimbus again providing the remote location of the JAR file, the serialized version of topology and the topology configuration.

The above workflow is the same for both Storm and Trident topologies as Trident topologies are compiled into regular Storm topologies before submission.

Programmatic submission of topologies

In programmatic submission we will reconstruct the above process without setting any system-wide properties to avoid side-effects.

Configuration

First we configure topology-specific settings. Some of these (e.g. the number of workers) are topology-specific and may be requested by the Topology constructor rather than set at this stage.

Then we load the default configuration in order to retrieve common settings for interacting with the Nimbus server. Note that only a few values from the default configuration are inserted into the submitted configuration as this will override server-wide settings on the servers (worker nodes). E.g. if we have added the complete default configuration into the submitted configuration, the worker processes that run the topology will be setup with the Zookeeper URL localhost:2181 which is wrong in most cases.

Config topologyConf = new Config();
topologyConf.setDebug(true);
topologyConf.setNumWorkers(6);

Map defaultConf = Utils.readStormConfig();
		
Map conf = new HashMap();
conf.put(Config.NIMBUS_HOST, "nimbus.mydomain.com");
conf.put(Config.NIMBUS_THRIFT_PORT, defaultConf.get(Config.NIMBUS_THRIFT_PORT));
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, defaultConf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN));
	
conf.putAll(topologyConf);

Code Upload

When configuration is complete, the uber JAR file is uploaded to Nimbus:

String inputJar = "storm-test-1.0-SNAPSHOT-jar-with-dependencies.jar";
String remoteJar = StormSubmitter.submitJar(conf, inputJar);

Topology Submission

Finally, the topology is constructed with a call to the designated topology builder in the uber JAR file:

StormTopology topology = getTopology(String.format("file://%s", inputJar), 
	"io.modio.blog.storm.submit.ExclamationTopology", "buildTopology");
private static StormTopology getTopology(String path, String className, String methodName)
	throws MalformedURLException, ClassNotFoundException, NoSuchMethodException, 
	SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
	ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, 
		RemoteSubmitter.class.getClassLoader());

	Class<?> clazz = loader.loadClass(className);
	Method method = clazz.getMethod(methodName, new Class[] {});
	
	return (StormTopology) method.invoke(null, new Object[] {});
}

A remote reference to the Nimbus is retrieved and the Topology is sent:

NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
	client.getClient().submitTopology("my-topology", remoteJar, 
		JSONValue.toJSONString(conf), topology);
} catch (AlreadyAliveException e) {
	e.printStackTrace();
}

Leave a Reply

Your email address will not be published. Required fields are marked *