WSO2 File Processing

File Processing On WSO2 A Simple File Processing Example

This example sets up a simple file processing orchestration on the ESB which reads a CSV file from a watched folder, transforms it and writes it to another folder in an arbitrary format.

Initial attempts to process .csv files on the WSO2 service bus were based on the example in http://wso2.com/library/blog-post/2013/09/csv-to-xml- transformation-with-wso2-esb-smooks-mediator/ which works in the following way:

The process being:

wso2flow1

1. We set up a VFS Reader Proxy to poll for files a defined folder

2. When a file is found we use a smooks transformation to change the file into an XML structure

3. We then use a WSO2 XSLT transform to transform the XML to a .csv format.

Lets get started!

 

Setting up the Demo

(we assume you have set up the WSO2 ESB as standard)

Step 1. Enable the VFS Transports

Full instructions are here, but essentially edit the file <ESB_HOME>/repository/conf/axis2/axis2.xml, uncomment the lines: <transportreceiver name=”vfs” class=”org.apache.synapse.transport.vfs.VFSTransportListener”/> and
<transportSender name=”vfs” class=”org.apache.synapse.transport.vfs.VFSTransportSender”/> Then re-start the ESB

Step 2. Create the output stylesheet

The simple xslt file I used was :

<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> <xsl:output omit-xml-declaration="yes" method="text" encoding="iso-8859-1"/>

<xsl:strip-space elements="*" />

<xsl:template match="/*/child::*">
 <xsl:for-each select="child::*">
 <xsl:if test="position() != last()"><xsl:value-of select="normalize-space(.)"/>,</xsl:if> <xsl:if test="position() = last()"><xsl:value-of select="normalize-space(.)"/><xsl:text> </xsl:text></xsl:if>
 </xsl:for-each>
 </xsl:template>
</xsl:stylesheet>

<xslt key=”gov:/xmltocsv.xsl”/>

This file needs to be added to the WSO2 governance registry so create the xslt file locally with the contents as above. Open the wso2 management console and click on Registry -> Browse at the bottom of the navigation bar on the left. In the registry navigate to /_system/governance and click on the governance node. Then click add resource…

Method: Upload Content From File
File: The file you saved the xslt as above Name: xmltocsv.xsl
then press Add.

Step 3. Create a Smooks Config File

You need to create a smooks config file, this tells smooks how to parse the data in the csv file, you can change this file to make the system parse pipe delimited files etc. Here is a simple one:

<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.0.xsd">
<!--Configure the CSVParser to parse the message into a stream of SAX events. -->
<resource-config selector="org.xml.sax.driver">
<resource>org.milyn.csv.CSVParser</resource>
<param name="fields" type="string-list">
name,surname,phone
</param>
</resource-config>
</smooks-resource-list>

Save the text above into a file called smooks-config.xml and save it in <ESB_HOME>/resources/

Then go into the ESB management tool and create a local entry for this file. Under Service Bus, select Local Entries and press add. Select Add source URL Entry with the following values:

Name: smooks
URL: file:resources/smooks-config.xml And save it.

Step 4. Create the Output Sequence

In the WSO2 management console click ‘Service Bus’ add then press add a sequence at the top. On the add sequence page switch to source view and paste the contents below in the editor.

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="fileWriteSequence" trace="enable" xmlns="http://ws.apache.org/ns/synapse">
<property
expression="fn:concat(fn:substring-after(get-property('MessageID'), 'urn:uuid:'), '.txt')"
name="transport.vfs.ReplyFileName" scope="transport"
xmlns:ns="http://org.apache.synapse/xsd" xmlns:ns2="http://org.apache.synapse/xsd"/>
<property name="OUT_ONLY" value="true"/>
<smooks config-key="smooks">
<input type="text"/>
<output type="xml"/>
</smooks>
<xslt key="gov:/xmltocsv.xsl"/>
<send>
<endpoint name="FileEpr">
<address uri="vfs:file:///vfs/out"/>
</endpoint>
</send>
</sequence>

Just a quick overview of what this does…

1. Create a new file name for the output file based on the message name (just to make it unique)
2. Run the smooks config over the contents of the message that was sent to this sequence to create an XML file representation of the CSV input

3. Perform an XML transformation on the results of the smooks output

4. Write the resulting XML to the file file:///vfs/out (which is just a local folder on my machine)

Step 5. Create the Proxy Service

The proxy service is responsible for watching the source folder and starting the output sequence to perform the conversion.
In the ESB management console under Services click Add Proxy Service, then ‘Custom Proxy’, switch to source view and paste the following text:

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="FileProxy"
transports="vfs"
statistics="disable"
trace="enable"
startOnLoad="true">
<target>
<inSequence>
<clone>
<target sequence="fileWriteSequence"/>
</clone>
</inSequence>
</target>
<parameter name="transport.PollInterval">1</parameter>
<parameter name="transport.vfs.FileURI">file:///vfs/original</parameter>
<parameter name="transport.vfs.ContentType">text/plain</parameter>
<parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file:///vfs/failures</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
<parameter name="transport.vfs.FileNamePattern">.*.txt</parameter>
<parameter name="transport.vfs.MoveAfterProcess">file:///vfs/pass</parameter>
<description/>
</proxy>

There are two important pieces of information here, firstly the in sequence part which performs a clone of the message and passes it to the fileWriteSequence we created a second ago which will perform the real work. Secondly the VFS transport settings which are responsible for performing the file watching and moving around.

The key parameters here are:

1. the PollInterval in seconds
2. the fileURI which is the folder to watch and the FileNamePattern which is the file pattern to match against 3. the ActionAfterProcess which is MOVE to the MoveAfterProcess folder
4. and the ActionAfterFailure which is MOVE to the MoveAfterFailure Process

So to run this you will need to set the three folders up! (and don’t forget to create one for the output from the Output Sequence!

Try it out.

To try it out just put a file like the one below into the watch folder (file:///vfs/original in my case) and make sure to name it so it matches the FileNamePattern.

Lakmali,Erandi,Female,20,SriLanka
Lakmali,Baminiwatta,Female,20,SriLanka

and you should get an output file in the output folder /vfs/out.

Things you can do with this…

This simple configuration is very flexible as you can do things like:

1. Change the smooks config file to read any file, skip header rows etc etc. see the smooks documentation 2. Change the XSLT transform to output to any format you like
3. Change the sequence to copy the data in the rows to a database (see the wso2 sample in references)

4. Change the sequence to send each row to a web service

5. Use VFS to export to ftp or sftp services <address uri=”vfs:sftp://username:Password!@localhost:22/”/> 6. And so much more…

But there is gotcha..

The main issue with the above method is that it’s working like an ESB and not an ETL tool, what’s happening under the hood is that the ESB is creating an in memory version of the csv file and wrapping it in a message wrapper to pass to the writing sequence. This means that for large files (even with streaming turned on) we are looking at a large memory overhead. What makes it worse is that the <XSLT tag in the output sequence is a slow XSLT transform – we want to use the <FastXSLT option, however this only works on the incoming source, you can’t manipulate the message in any sequences if you use this option. Streaming will help this process and will minimise memory overhead but the use of the <XSLT tag will cause memory issues.

So what can we do, well the simplest option is to go for a two step process whereby we create one process to extract the information from the csv file and then a second service that either reads an output XML or receives each row through a web service. This approach – especially the web service option does offer more options for a transformation architecture as we can make the intermediate service a ‘business service’ using a corporate canonical model, and develop any new systems against that.

But I Just Want my ETL Process

If you just want to continue using the ETL process and don’t care about transition, there is an option for this too… In the example below we build on the example above and use SMOOKS alone to stream the contents of one CSV to another, then use the WSO2 engine to provide scheduling and to manage process threading and sequencing issues. (which are mainly to get around the fact the bundled version of smooks is 1.1 not 1.4+ which would save us using the file connector described below.

Note I am assuming you have done all of the steps above before perfuming these for the streaming example. The process we are going to implement here is:

wso2flow2

Step1 Create a New Smooks Configuration

This time the smooks config file is a lot larger as it’s doing a lot more work for us…Save the text above into a file called smooks_configskuwarehouse.xml and save it in <ESB_HOME>/resources/

<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd" xmlns:file="http://www.milyn.org/xsd/
smooks/file-routing-1.1.xsd" xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.2.xsd" xmlns:ftl="http://www.
milyn.org/xsd/smooks/freemarker-1.1.xsd">
<params>
<param name="stream.filter.type">SAX</param>
</params>
<csv:reader fields="ParentSKU,AttributeSKU,WarehouseID,Published,Stock,SellingPrice,InventoryValue"
rootElementName="rows" recordElementName="row" skipLines="1"/>
<resource-config selector="row">
<resource>org.milyn.delivery.DomModelCreator</resource>
</resource-config>
<ftl:freemarker applyOnElement="row">
<ftl:template><![CDATA[${row.ParentSKU},${row.AttributeSKU},${row.WarehouseID},${row.Published},${row.Stock
},${row.SellingPrice},${row.InventoryValue}
]]></ftl:template>
<ftl:use>
<ftl:outputTo outputStreamResource="outputStream" />
</ftl:use>
</ftl:freemarker>
<file:outputStream resourceName="outputStream"
openOnElement="rows">
<file:fileNamePattern>file_output.csv</file:fileNamePattern>
<file:destinationDirectoryPattern>/vfs/outskuwarehouse</file:destinationDirectoryPattern>
</file:outputStream>
</smooks-resource-list>

Then go into the ESB management tool and create a local entry for this file. Under Service Bus, select Local Entries and press add. Select Add source URL Entry with the following values:

Name: smooks_configskuwarehouse
URL: file:resources/smooks_configskuwarehouse.xml And save it.

What’s that do then?

Ok so this is a little more complex this is what is going on:

  1. first set the file reader to SAX mode (this makes sure we are using the streaming mode for file processing)
  2. Set up a CSV reader, that will create XML chunks from the csv with a root node named <rows> and each row having a page <row>, eachrow having the tags in the fields.
  3. The resource config command says create a simple Java model for each row (essentially a java class that allows you to access eachfield via row.<header> when you are outputting the rows.
  4. Then we hit the freemarker template which tells us when we see each row in the incoming sax stream we need to output it as a csv filewith the fields in the order they are specified.
  5. The use tag tells freemarker not to output the contents to the standard output stream but to a new file called ‘outputStream’ see later.Note this is done as by default (unlike an XSLT transform) freemarker will output unmatched elements to the stream. This means you end up with the <rows></rows> tags encompassing your csv content. This does however mean that your output from this transform is not going to get sent back to the ESB… but we will solve that issue later!
  6. The final freemarker tag matches the <rows> tag in the xml stream generated from the CSV file and opens a file at a known location so we can write out output there.

Note that the version of smooks here does not allow us to put any variables (other than fields in the input file) into the filename (versions of smooks 1.4 and above allow us to embed timestamps into the output file name) This means that we can only have this code executing once (in one thread) and we must move the file away or rename it before the next file comes along.

Install The WSO2 FileConnector

As we need to manipulate the file system we need to install a connector to the WSO2 ESB to help out. This is reasonably easy so just follow the instructions below:

  1. Download the zip file from https://store.wso2.com/store/assets/esbconnector/f0550184-88c4-4d5e-b73f-ea51b1b5ab98 (note this seemsto fail to download in SAFARI )
  2. In the ESB management console select Connectors -> Add
  3. Select the downloaded zip file and press ok.

Done!

Create a New Proxy Service

In the ESB management console under Services click Add Proxy Service, then ‘Custom Proxy’, switch to source view and paste the following text:

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="FileProxySkuWarehouse"
transports="vfs,http,https"
statistics="enable"
trace="disable"
startOnLoad="true">
<target>
<inSequence>
<property name="DISABLE_SMOOKS_RESULT_PAYLOAD"
value="true"
scope="default"
type="STRING"/>
<smooks config-key="smokes_skuwarehouse">
<input type="text"/>
<output type="text"/>
</smooks>
<fileconnector.rename>
<filelocation>file:///vfs/outskuwarehouse/</filelocation>
<file>file_output.csv</file>
<newfilename>{fn:concat(fn:substring-after(get-property('MessageID'), 'urn:uuid:'), '.txt')}</newfilename>
</fileconnector.rename>
</inSequence>
</target>
<parameter name="transport.vfs.Streaming">true</parameter>
<parameter name="transport.PollInterval">2</parameter>
<parameter name="transport.vfs.FileURI">file:///vfs/originalskuwarehouse</parameter>
<parameter name="transport.vfs.ContentType">text/plain</parameter>
<parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file:///vfs/failuresskuwarehouse</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
<parameter name="transport.vfs.FileProcessCount">1</parameter>
<parameter name="transport.vfs.FileNamePattern">.*.txt</parameter>
<parameter name="transport.vfs.MoveTimestampFormat">yyMMddHHmmssZ</parameter>
<parameter name="transport.vfs.MoveAfterProcess">file:///vfs/passskuwarehouse</parameter>
<description/>
</proxy>

Important changes in this from the previous proxy…

  1. DISABLE_SMOOKS_RESULT_PAYLOAD this stops the output from smooks being constructed in memory in the ESB message pipeline
  2. transport.vs.Streaming – tells VFS we want to stream this process – keeps memory utilisation low.
  3. transport.vfs.FileProcessCount – tells the VFS to only process one file at a time – this avoids the problems with the single file name weuse when outputting during the smooks config.
  4. A few more folders – you need to create these as before one for source, one for success and one for failures.
  5. Smooks – note we are performing the smooks transform in this proxy now and not in a sub sequence – this was done just to make theexample simpler and because there is no real need for a second sequence in this example. But we are using the new smooks config notthe simple one.
  6. File connector – this is a new bit, it essentially moves the hardcoded output file (from where smooks put it) into the same folder but with afile name consisting of the message ID and a .txt extension

Wow that seems simple!

Yes this is now a very simple mediation, it’s not very flexible but it certainly is very efficient. The drawbacks are down to the smooks version we are using which makes us output to a single known file name. This makes us throttle the number of processes we use to 1 to ensure we do not corrupt this single file. However this actually may not be a bad thing, especially on mechanical disks as if we had more than one file stream open the disk heads will be under enormous pressure.

Conclusion

What I hope I have shown here is that using WSO2 – an ESB not an ETL tool, we have a number of options when it comes to integration. We can use the ‘ESB’ style transformation in option 1 to give us a flexible system using business services to read from and output to a verity of different sources or destinations, or we can use the ‘ETL’ option 2 to get massively scalable data throughput.

What is important to note is that all this has been done through configuring the tools and without any coding effort which makes this a very flexible platform to use as a standard middleware layer should we so wish.

References

This example has been built from a number of different on line examples including:

https://docs.wso2.com/display/ESB490/Sample+271%3A+File+Processing http://jayalalk.blogspot.co.uk/2014/04/wso2-esb-huge-message-processing-inside.html

https://docs.wso2.com/display/ESBCONNECTORS/Working+with+the+File+Connector#WorkingwiththeFileConnector-sampleconfigurationSampl econfiguration

http://wso2.com/library/blog-post/2013/09/csv-to-xml-transformation-with-wso2-esb-smooks-mediator/

So thanks to all those authors!