*****ยังมีผิดพลาดอยู่ ต้องแก้ไข ตรง serialize protobuf ก่อนจะส่งเข้า MQ *****

ช่วงนี้นั่งว่างๆไม่มีอะไรน่าสนใจทำเท่าไหร่ แล้วบังเอิญได้ไปคุยเล่นๆกับพี่คนนึง ไปๆมาๆ คุยเรื่อง Performance ของระบบที่ต้องการ latency ต่ำ ผมก็เลยแนะนำว่าให้ใช้ Protobuf สิ โดยเราสามารถสร้าง Object ด้วย java ส่งผ่านช่องทางใดๆ แล้วก็สามารถใช้ภาษา ไม่ว่าจะเป็น python c ruby ในการอ่านแบบ object ได้ทันที (แนะนำทั้งๆที่ยังไม่เคยใช้ 555) อีกด้านหนึ่งก็คือผมเป็นคน anti xml สุดๆ เพราะว่ามันเปลือง resource ในการ processing มากมาย  แถมระบบทั้งหมดก็เป็น internal ไม่ได้ expose service ออกไปให้ใครอื่นใช้งาน ดังนั้นแล้วเพื่อความมั่นใจว่า solution เราใช้งานได้จริง ก็เลยต้องเอามาทดลองด้วยตัวเอง

ตัว Apache Camel นี้เป็น software ที่ทำหน้าที่สำหรับช่วยในการ routing message จาก A ไป B เช่นต้องการส่ง Message จาก JMS Queue ไปยัง Webservice เป็นต้น apache camel มี DSL ของมันเอง (Domain-specific language) ซึ่งในที่นี้ก็คือ Fluent API

ตัวอย่างของ DSL

from(“activemq:queue:jms_in_queue”).to(“activemq:queue:jms_out_queue”)

ใครเผลอเข้ามาอ่านคงตกใจ เมื่อก่อนเรานั่งเขียนอะไรยืดยาวไม่ต่ำกว่า 2 หน้ากระดาษ กว่าจะส่ง message  จาก queue A ไป queue B ได้ แต่นี่ด้วยความสามารถของ camel เราทำเสร็จได้ภายในบรรทัดเดียว

ActiveMQ เป็น message broker ซึ่งก็คือสื่อกลางในการติดต่อสื่อสารกันระหว่าง application ด้วยการนำ queue หรือ topic มาใช้งานนั่นเอง 

เริ่มแรกเราจะติดตั้ง ActiveMQ กันก่อนโดยการเข้าไปรัน activemq.bat ตัว activemq ก็จะมี web interface ให้เราเข้าไปแก้ไขสร้างเพิ่มเติม queue ได้ด้วยตัวเอง โดยผมแนะนำให้ใช้ webui ตัวใหม่ http://localhost:8161/hawtio (admin/admin)

หลังจาก login เข้าไปได้แล้วก็ให้ไปสร้าง queue มาสองตัวชื่อ jms_in_queue กับ jms_out_queue

หลังจากนั้นเราจะเริ่มมาเขียนโปรแกรมสำหรับการติดต่อ activemq ด้วย camel กัน

ผมใช้ netbeans 7.4 แบบ maven project

นี่คือรายชื่อ dependencies ที่เราจะใช้งาน (อาจจะมีเกินไปนิดหน่อย)

   <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-stomp</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-amqp</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>

หลังจาก netbeans ทำการ download dependencies มาให้เราหมดแล้ว ก็ให้ทำการ download protobuf https://code.google.com/p/protobuf/downloads/list

หลังจาก download และ extract แล้วก็ให้ทำการสร้าง java class ตามตัวอย่างต่อไปนี้

http://camel.apache.org/protobuf.html

โดยผมจะแก้ชื่อ package เป็น krisa เพื่อความสะดวก โดย class ที่เราได้ออกมาจะชื่อว่า AddressBookProtos ซึ่งมี class Person กับ AddressBook อยู่ข้างใน

หลังจากที่ลากไฟล์ java เข้ามาใน Project netbeans แล้วเราก็จะเริ่มมาลองเล่น camel กัน

 

CamelContext ctx = new DefaultCamelContext();

Context jndiContext = null;
ConnectionFactory cf = null;
Properties env = new Properties();
env.put(“java.naming.factory.initial”, “org.apache.activemq.jndi.ActiveMQInitialContextFactory”);
env.put(“java.naming.provider.url”, “tcp://localhost:61616”);
jndiContext = new InitialContext(env);
cf = (ConnectionFactory) jndiContext.lookup(“ConnectionFactory”);

ctx.addComponent(“activemq”, new AMQPComponent(cf));
ctx.addRoutes(new AMQ2AMQ());
ctx.start();

#ProducerTemplate

Thread.sleep(100000);
ctx.stop();

ใน Class App ส่วนของ public static void main เราจะต้องทำการสร้าง CamelContext หลังจะได้ context แล้วเราก็จะทำการ register component โดยในตัวอย่างนี้เราจะทำการ register AMQPComponent เข้าไปเพื่อไว้ใช้ในการติดต่อกับ activeMQ server ของเรา ถัดไปเราก็จะ register class AMQ2AMQ เข้าไปใน route engine ของ camel โดย class AMQ2AMQ นี่จะ extends class RouteBuilder

public class AMQ2AMQ extends RouteBuilder {

@Override
public void configure() throws Exception {…}

}

 

หน้าที่ของ route ก็คือการอ่านและส่งต่อ message จาก source A ไปยัง source B

ในตัวอย่างนี้ผมจะทำการสร้าง message ด้วย protobuf จากข้างต้น แล้วส่งเข้าไปยัง jms_in_queue

โดยเราจะใช้ component ที่ชื่อว่า direct อ้างอิงด้วย “start” component direct นี่จะใช้สำหรับการสื่อสารแบบ internal ใน app เท่านั้น และก่อนที่เราจะใช้ component นี้ได้ เราต้องทำการสร้าง Producer ที่จะเป็นตัวยิง message เข้าในนี้ก่อนด้วย code ข้างล่าง (นำไปใส่ไว้ใน class App #ProducerTemplate)

ProducerTemplate template = ctx.createProducerTemplate();

template.sendBody(“direct:start”, “some message”);

ใน class AMQ2AMQ ของเราก็จะทำการอ่าน Message จาก “direct:start” แล้วทำการ process message ใหม่โดยการ setBody เป็น object java (protobuf) ของเรา จะเห็นว่าเราทำการสร้าง Processor ตัวใหม่ ซึ่งมี method process โดยมี arg เป็น Exchange ซึ่ง Exchange นี้ให้คิดว่ามันเป็นบุรษไปรษณ์ย ที่มี ซองจดหมายซึ่งมี header มี body ตามลำดับ ที่นี้ เราก็จะทำการแก้ไข body ของจดหมาย โดยผมจะปล่อยซองขาออก “out” ไว้เฉยๆ จะแก้แต่ “in” (ทำตาม camel in action) เท่านั้น หลังจากนั้นเราก็จะส่ง message นี้กลับไปยัง jms_in_queue ด้วย activemq ที่เราเซทไว้ก่อนหน้า

from(“direct:start”).process(new Processor() {

public void process(Exchange exchng) throws Exception {
Person john
= Person.newBuilder()
.setId(1234)
.setName(“John Doe”)
.setEmail(“jdoe@example.com”)
.addPhone(
Person.PhoneNumber.newBuilder()
.setNumber(“555-4321”)
.setType(Person.PhoneType.HOME))
.build();
System.out.println(“here”);
exchng.getIn().setBody(john);
}
}).to(“activemq:queue:jms_in_queue”);

ตอนนี้ถ้าเข้าไปดูที่ ActiveMQ console ก็จะเห็น Message ค้างอยู่ใน queue jms_in_queue ของเราแล้ว

สำหรับการเอา message ออกจาก queue ก็ไม่ได้แตกต่างจากเดิมนัก คือ from activemq:queue:jms_in_queue นั่นเอง แล้วเราก็จะทำการ process โดยการ getBody ของ “In” แล้วแปลงเป็น class Person ตามตัวอย่างข้างล่าง

from(“activemq:queue:jms_in_queue”).process(new Processor() {

public void process(Exchange exchng) throws Exception {

Person person = exchng.getIn().getBody(krisa.AddressBookProtos.Person.class);
System.out.println(person.getName());
}
}).to(“activemq:queue:jms_out_queue”);

นี่คือตัวอย่างง่ายๆในการใช้ protobuf แทนการสื่อสารแบบ xml ที่ efficient กว่ามาก เหมาะกับ internal communication ที่ต้องการ overhead ต่ำ และความเร็วที่เร็วกว่า xml หลายสิบเท่านัก