基于java中BlockingQueue的使用介绍
时间:2022-02-04 08:53:50|栏目:JAVA代码|点击: 次
最近在维护一个java工程,在群里面也就聊起来java的优劣!无奈一些Java的终极粉丝,总是号称性能已经不必C++差,并且很多标准类库都是大师级的人写的,如何如何稳定等等。索性就认真研究一番,他们给我的一项说明就是,在线程之间投递消息,用java已经封装好的BlockingQueue,就足够用了。
既然足够用那就写代码测试喽,简简单单写一个小程序做了一番测试:
//默认包
import java.util.concurrent.*;
import base.MyRunnable;
public class Test
{
public static void main(String[] args)
{
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
java.lang.Runnable r = new MyRunnable(queue);
Thread t = new Thread(r);
t.start();
while(true)
{
try
{
while(true)
{
for(int i =0;i < 10000;i++)
{
queue.offer(i);
}
}
}
catch ( Exception e)
{
e.printStackTrace();
}
}
}
}
//需要添加的包
package base;
import java.lang.Runnable;
import java.util.concurrent.*;
import java.util.*;
public class MyRunnable implements Runnable
{
public MyRunnable(BlockingQueue<Integer> queue)
{
this.queue = queue;
}
public void run()
{
Date d = new Date();
long starttime = d.getTime();
System.err.println(starttime);
int count = 0;
while(true)
{
try
{
Integer i = this.queue.poll();
if(i != null)
{
count ++;
}
if(count == 100000)
{
Date e = new Date();
long endtime = e.getTime();
System.err.println(count);
System.err.println(endtime);
System.err.print(endtime - starttime);
break;
}
}
catch (Exception e)
{
}
}
}
private BlockingQueue<Integer> queue;
}
传递十万条数据,在我的测试机上面,大概需要50ms左右,倒是还可以!索性就看了一下BlockingQueue的底层实现
我在上面的测试代码中使用的offer 和 poll,就看看这两个实现函数吧,首先是offer
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
和一般的同步线程类似,只是多加了一个signal,在学习unix环境高级编程时候,看到条件变量用于线程之间的同步,可以实现线程以竞争的方式实现同步!
poll函数的实现也是类似!
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
既然足够用那就写代码测试喽,简简单单写一个小程序做了一番测试:
复制代码 代码如下:
//默认包
import java.util.concurrent.*;
import base.MyRunnable;
public class Test
{
public static void main(String[] args)
{
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
java.lang.Runnable r = new MyRunnable(queue);
Thread t = new Thread(r);
t.start();
while(true)
{
try
{
while(true)
{
for(int i =0;i < 10000;i++)
{
queue.offer(i);
}
}
}
catch ( Exception e)
{
e.printStackTrace();
}
}
}
}
//需要添加的包
package base;
import java.lang.Runnable;
import java.util.concurrent.*;
import java.util.*;
public class MyRunnable implements Runnable
{
public MyRunnable(BlockingQueue<Integer> queue)
{
this.queue = queue;
}
public void run()
{
Date d = new Date();
long starttime = d.getTime();
System.err.println(starttime);
int count = 0;
while(true)
{
try
{
Integer i = this.queue.poll();
if(i != null)
{
count ++;
}
if(count == 100000)
{
Date e = new Date();
long endtime = e.getTime();
System.err.println(count);
System.err.println(endtime);
System.err.print(endtime - starttime);
break;
}
}
catch (Exception e)
{
}
}
}
private BlockingQueue<Integer> queue;
}
传递十万条数据,在我的测试机上面,大概需要50ms左右,倒是还可以!索性就看了一下BlockingQueue的底层实现
我在上面的测试代码中使用的offer 和 poll,就看看这两个实现函数吧,首先是offer
复制代码 代码如下:
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
和一般的同步线程类似,只是多加了一个signal,在学习unix环境高级编程时候,看到条件变量用于线程之间的同步,可以实现线程以竞争的方式实现同步!
poll函数的实现也是类似!
复制代码 代码如下:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
上一篇:Java AtomicInteger类使用方法实例讲解
栏 目:JAVA代码
本文标题:基于java中BlockingQueue的使用介绍
本文地址:http://www.codeinn.net/misctech/192308.html


阅读排行
- 1Java Swing组件BoxLayout布局用法示例
- 2java中-jar 与nohup的对比
- 3Java邮件发送程序(可以同时发给多个地址、可以带附件)
- 4Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type异常
- 5Java中自定义异常详解及实例代码
- 6深入理解Java中的克隆
- 7java读取excel文件的两种方法
- 8解析SpringSecurity+JWT认证流程实现
- 9spring boot里增加表单验证hibernate-validator并在freemarker模板里显示错误信息(推荐)
- 10深入解析java虚拟机




