View Javadoc
1   package gboat2.cxf.businessImpl;
2   
3   import gboat2.base.core.dao.QuerySupport;
4   import gboat2.base.core.service.BaseService;
5   import gboat2.cxf.Constant;
6   import gboat2.cxf.business.IWebServiceConfigBusiness;
7   import gboat2.cxf.model.WebServiceConfig;
8   import gboat2.cxf.model.WebServiceConfigParam;
9   
10  import java.io.IOException;
11  import java.net.InetAddress;
12  import java.net.ServerSocket;
13  import java.net.UnknownHostException;
14  import java.util.Dictionary;
15  import java.util.HashMap;
16  import java.util.Hashtable;
17  import java.util.List;
18  import java.util.Map;
19  
20  import org.apache.commons.lang3.StringUtils;
21  import org.osgi.framework.BundleContext;
22  import org.osgi.framework.Constants;
23  import org.osgi.framework.Filter;
24  import org.osgi.framework.InvalidSyntaxException;
25  import org.osgi.framework.ServiceReference;
26  import org.osgi.framework.ServiceRegistration;
27  import org.osgi.util.tracker.ServiceTracker;
28  import org.osgi.util.tracker.ServiceTrackerCustomizer;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.springframework.osgi.context.BundleContextAware;
32  import org.springframework.transaction.annotation.Transactional;
33  
34  @Transactional
35  public class WebServiceConfigBusinessImpl extends BaseService implements IWebServiceConfigBusiness, BundleContextAware,
36          ServiceTrackerCustomizer {
37  	
38  	private Logger logger = LoggerFactory.getLogger(WebServiceConfigBusinessImpl.class);
39  	
40  	private BundleContext bc;
41  	
42  	private volatile boolean initAddressSign = Boolean.FALSE;
43  	
44  	private String hostName = null;
45  	
46  	private String port = null;
47  	
48  	private static Map<String, ServiceRegistration<?>> publishedWebServices = new Hashtable<String, ServiceRegistration<?>>();
49  	
50  	//所有要发布成webserivce的接口都需要被监听,需监听的列表存储在被数据Map中
51  	private static Map<String, WebServiceConfig> interfacesToListening = new Hashtable<String, WebServiceConfig>();
52  	
53  	private static ServiceTracker tracker;
54  	
55  	private static byte[] lock = new byte[0];
56  	
57  	private void initInterfacesToListening() {
58  		//		boolean isNotActive = false;
59  		//		for (Bundle b : bc.getBundles()) {
60  		//			if (b.getSymbolicName().equals("gboat2.base.dao")) {
61  		//				if ((b.getState() & Bundle.ACTIVE) == 0) {
62  		//					isNotActive = true;
63  		//				}
64  		//			}
65  		//		}
66  		List<WebServiceConfig> webSerConfigList = getActiveWebServiceConfigs();
67  		
68  		//		if (isNotActive || null == webSerConfigList || webSerConfigList.size() == 0) {
69  		//			System.out.print("dao还没起来");
70  		//		}
71  		
72  		//缓存所有要发布的接口
73  		if (null != webSerConfigList) {
74  			for (WebServiceConfig config : webSerConfigList) {
75  				interfacesToListening.put(config.getInterfaceName(), config);
76  			}
77  		}
78  	}
79  	
80  	@SuppressWarnings({ "rawtypes", "unchecked" })
81      @Override
82  	public Object addingService(ServiceReference ref) {
83  		String[] interfaceNames = (String[]) ref.getProperty(Constants.OBJECTCLASS);
84  		boolean isPublishedService = (null != ref.getProperty(EXPORTED_INTERFACES)); //过滤掉已发布服务导致的add事件
85  		
86  		if (!isPublishedService && null != interfaceNames && interfaceNames.length != 0) {
87  			WebServiceConfig config = interfacesToListening.get(interfaceNames[0]);
88  			if (null != config) {//刚添加的服务在监视列表中,则发布该服务
89  				publishService(config);
90  			}
91  		}
92  		return bc.getService(ref);
93  	}
94  	
95  	@SuppressWarnings("rawtypes")
96      @Override
97  	public void modifiedService(ServiceReference sref, Object service) {
98  	}
99  	
100 	@SuppressWarnings("rawtypes")
101     @Override
102 	public void removedService(ServiceReference sref, Object service) {
103 		String[] interfaceNames = (String[]) sref.getProperty(Constants.OBJECTCLASS);
104 		boolean isPublishedService = null != sref.getProperty(EXPORTED_INTERFACES); //过滤掉已发布服务导致的remove事件
105 		if (!isPublishedService && null != interfaceNames && interfaceNames.length != 0) {
106 			String interfaceName = interfaceNames[0];
107 			if (interfacesToListening.containsKey(interfaceName)) { // 发布的服务被删除,登记的服务需要移除
108 				publishedWebServices.remove(interfaceName);
109 			}
110 		}
111 	}
112 	
113 	@SuppressWarnings({ "unchecked", "rawtypes" })
114     @Override
115 	public void startPublishAndTrackWebService() {
116 	    // 只在第一次实例化 bean 对象时打开追踪器
117 		synchronized (lock) {//锁定tracker再进行判断
118 			if (null != tracker) {
119 				return;
120 			}
121 		}
122 		
123 		initInterfacesToListening();
124 		
125 		Filter filter = null;
126 		try {
127 			filter = bc.createFilter("(objectClass=*)");
128 		} catch (InvalidSyntaxException e) {
129 			logger.error("CXF webService 监听过滤器语法错误", e);
130 			throw new IllegalArgumentException("unexpected InvalidSyntaxException: " + e.getMessage());
131 		}
132 		tracker = new ServiceTracker(bc, filter, this);
133 		tracker.open();
134 	}
135 	
136 	//	public boolean publishAllServicesIf() {
137 	//		boolean isPublished = true;
138 	//		List<WebServiceConfig> webSerConfigList = getActiveWebServiceConfigs();
139 	//		if (null != webSerConfigList) {
140 	//			for (WebServiceConfig config : webSerConfigList) {
141 	//				//if (!serviceRegs.containsKey(config.getInterfaceName())) {
142 	//				ServiceRegistration tempSerReg = publishService(config);
143 	//				if (tempSerReg == null) {
144 	//					isPublished = false;
145 	//				}
146 	//				//}
147 	//			}
148 	//		}
149 	//		
150 	//		return isPublished;
151 	//	}
152 	
153 	private List<WebServiceConfig> getActiveWebServiceConfigs() {
154 		Map<String, Object> params = new HashMap<String, Object>();
155 		params.put(QuerySupport.PARAM_TABLENAME, WebServiceConfig.class.getName());
156 		//params.put("_status", Constant.WEBSERVICE_STATUS_ACTIVE);
157 		return (List<WebServiceConfig>) super.query(params);
158 	}
159 	
160 	/**
161 	 * 本方法可以在cxf osgi bundle启动前调用,因为该bundle启动后会扫描发布的服务并启动他们。故在调用本方法前无需检测
162 	 * cxf osgi bundle是否已经启动
163 	 */
164 	public ServiceRegistration publishService(WebServiceConfig config) {
165 		if (StringUtils.isEmpty(config.getInterfaceName())) {
166 			logger.error("webserivce 记录中interfaceName为空,configId:" + config.getConfigId());
167 			return null;
168 		}
169 		String newAddress = getWebserviceReleaseAddress() + config.getRelativeAddress();
170 		Dictionary<String, String> props = new Hashtable<String, String>();
171         props.put(EXPORTED_INTERFACES, "*");
172         props.put(EXPORTED_CONFIGS, "org.apache.cxf.ws");
173         props.put(WS_ADDRESS, newAddress);
174 
175         ServiceRegistration<?> serviceReg = publishedWebServices.get(config.getInterfaceName());
176         if (null == serviceReg) { // 未曾发布过
177             ServiceReference<?> serviceRef = bc.getServiceReference(config.getInterfaceName());
178             Object serviceObj = bc.getService(serviceRef);
179             if (serviceObj != null) {
180                 // 注册服务
181                 serviceReg = bc.registerService(config.getInterfaceName(), serviceObj, props);
182             }
183         } else { // 已经发布过了,则更新
184             // if (!newAddress.equals(serviceReg.getReference().getProperty(REMOTE_POJO_ADDREASS_PROPERTY))) {// 地址更改过
185             if (!newAddress.equals(serviceReg.getReference().getProperty(WS_ADDRESS))) { // 地址更改过
186                 Object objImpl = bc.getService(serviceReg.getReference());
187                 serviceReg.unregister(); // 注销原来发布过的服务
188                 serviceReg = bc.registerService(config.getInterfaceName(), objImpl, props);// 重新注册成服务
189             }
190         }
191 		//添加到注册列表
192 		publishedWebServices.put(config.getInterfaceName(), serviceReg);
193 		
194 		if (!Constant.WEBSERVICE_STATUS_ACTIVE.equals(config.getStatus())) {
195 			config.setStatus(Constant.WEBSERVICE_STATUS_ACTIVE); // 设定库中数据状态为激活
196 			update(config);
197 		}
198 		
199 		// 发布后的服务需要加入到监听列表,以确保相应服务重启后会自动发布
200 		interfacesToListening.put(config.getInterfaceName(), config);
201 		return serviceReg;
202 	}
203 	
204 	public void unpublishService(WebServiceConfig config) {
205 		Dictionary props = new Hashtable();
206 		interfacesToListening.remove(config.getInterfaceName()); //必须在重新注册服务前移除监听列表,否则可能导致重新发布
207 		ServiceReference serviceRef = bc.getServiceReference(config.getInterfaceName());
208 		if (null != serviceRef) {
209 			Object objImpl = bc.getService(serviceRef);
210 			ServiceRegistration regist = publishedWebServices.get(config.getInterfaceName());
211 			if (null != regist) {
212 				regist.unregister();
213 				bc.registerService(config.getInterfaceName(), objImpl, props);//注销后需要重新注册成普通服务
214 			}
215 		}
216 		publishedWebServices.remove(config.getInterfaceName());
217 		
218 		if (!Constant.WEBSERVICE_STATUS_INACTIVE.equals(config.getStatus())) {
219 			config.setStatus(Constant.WEBSERVICE_STATUS_INACTIVE);//设定库中数据状态为不激活
220 			update(config);
221 		}
222 	}
223 	
224 	@Override
225 	public boolean delete(Object po) {
226 		this.unpublishService((WebServiceConfig) po);
227 		this.baseDAO.delete(po);
228 		return true;
229 	}
230 	
231 	public void setBundleContext(BundleContext bct) {
232 		bc = bct;
233 	}
234 	
235 	//	public void unpublishAllServices() {
236 	//		if (bc.getBundle().getState() == Bundle.ACTIVE) {
237 	//			Set<String> regKeys = publishedWebServices.keySet();
238 	//			for (String key : regKeys)
239 	//				unpublishService(key);
240 	//		}
241 	//	}
242 	
243 	public WebServiceConfigParam getWebSerConfParam(String key) {
244 		Map<String, Object> params = new HashMap<String, Object>();
245 		WebServiceConfigParam returnParam = null;
246 		params.put("TABLENAME", WebServiceConfigParam.class.getName());
247 		params.put("_paramKey", key);
248 		List webSerConfParams = super.query(params);
249 		if ((webSerConfParams != null) && (webSerConfParams.size() > 0)) {
250 			returnParam = (WebServiceConfigParam) webSerConfParams.get(0);
251 		}
252 		return returnParam;
253 	}
254 	
255 	/**
256 	 * 
257 	 * 获取web service发布地址
258 	 * @return
259 	 */
260 	private String getWebserviceReleaseAddress() {
261 		initWebServiceAddress();
262 		return hostName + ":" + port + "/";
263 	}
264 	
265 	/**
266 	 * 初始化web service地址
267 	 */
268 	private void initWebServiceAddress() {
269 		if (!initAddressSign) {
270 			WebServiceConfigParam mainAddressParam = getWebSerConfParam(SITE_ADDRESS_KEY);
271 			WebServiceConfigParam portParam = getWebSerConfParam(PORT_KEY);
272 			//域名(IP)处理
273 			if (null != mainAddressParam) {
274 				hostName = mainAddressParam.getParamValue();
275 			} else {
276 				hostName = "http://" + getHost();
277 				WebServiceConfigParam newAddressParam = new WebServiceConfigParam();
278 				newAddressParam.setParamKey(SITE_ADDRESS_KEY);
279 				newAddressParam.setParamValue(hostName);
280 				super.save(newAddressParam);
281 			}
282 			//端口处理
283 			if (null != portParam) {
284 				//port = portParam.getParamValue();
285 				port = getEnabledPort(portParam.getParamValue());
286 				if (port != null && !portParam.getParamValue().equals(port)) {
287 					portParam.setParamValue(port);
288 					super.update(portParam);
289 				}
290 			} else {
291 				//port = PORT_VALUE;
292 				port = getEnabledPort(PORT_VALUE);
293 				WebServiceConfigParam newPortParam = new WebServiceConfigParam();
294 				newPortParam.setParamKey(PORT_KEY);
295 				newPortParam.setParamValue(port);
296 				super.save(newPortParam);
297 			}
298 			initAddressSign = true;
299 		}
300 	}
301 	
302 	//获取合理port
303 	private String getEnabledPort(String port) {
304 		//先试默认的
305 		String returnPort = tryPortFree(port);
306 		/*Integer nextPort = Integer.parseInt(port);
307 		//如果传入的系统当前配置端口不合法,则试试比其大1的端口
308 		while (null == returnPort) {
309 			nextPort = nextPort + 1;
310 			if (null != tryPortFree(nextPort + "")) {
311 				returnPort = nextPort + "";
312 			}
313 		}*/
314 		return returnPort;
315 	}
316 	
317 	//查看某个端口是否可用
318 	private String tryPortFree(String port) {
319 		int p = Integer.parseInt(port);
320 		ServerSocket s = null;
321 		try {
322 			s = new ServerSocket(p);
323 			return "" + s.getLocalPort();
324 		} catch (IOException e) {
325 			logger.error("port " + port + " is already used.");
326 			return null;
327 		} finally {
328 			if (s != null)
329 				try {
330 					s.close();
331 				} catch (IOException e) {
332 					e.printStackTrace();
333 				}
334 		}
335 	}
336 	
337 	/**
338 	 * 获取当前主机名
339 	 * @return
340 	 */
341 	private String getHost() {
342 		try {
343 			InetAddress addr = InetAddress.getLocalHost();
344 			return addr.getHostAddress();
345 		} catch (UnknownHostException e) {
346 			e.printStackTrace();
347 		}
348 		return null;
349 	}
350 	
351 	@Override
352 	public boolean decideCurrentService(String expPckNamesString, String serviceName) {
353 		String servicePackageName = serviceName.substring(0, serviceName.lastIndexOf("."));
354 		if (expPckNamesString.indexOf(servicePackageName) >= 0) {
355 			return true;
356 		}
357 		return false;
358 	}
359 	
360 }