1 /*
2 * Copyright 1999-2006 University of Chicago
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "globus_i_notification_producer.h"
18 #include "globus_wsrf_core_tools.h"
19 #include "globus_xml_buffer.h"
20 #include "libxml/uri.h"
21 #include "NotificationConsumerService_client.h"
22 #include "wsnt_NotificationMessageHolderType.h"
23 #include "wstop_TopicNamespaceType.h"
24 #include "wstop_TopicType.h"
25 #include "wstop_SimpleTopicExpression.h"
26 #include "wsnt_TopicExpression.h"
27 #include "wsnt_ProducerProperties.h"
28 #include "wsnt_MessageContent.h"
29
30 #include "version.h"
31
32 #ifndef GLOBUS_DONT_DOCUMENT_INTERNAL
33
34 /* Local Prototypes */
35 0 GlobusDebugDefine(GLOBUS_NOTIFICATION_PRODUCER);
36
37 static
38 int
39 globus_l_notification_producer_activate(void);
40
41 static
42 int
43 globus_l_notification_producer_deactivate(void);
44
45 static
46 void
47 globus_l_notification_producer_atexit(void);
48
49 static
50 void
51 globus_l_notification_activate_subscription_manager(void);
52
53 static
54 int
55 globus_l_notification_epr_keyequal(
56 void * epr1,
57 void * epr2);
58
59 static
60 int
61 globus_l_notification_epr_hash(
62 void * key,
63 int limit);
64
65 static
66 globus_result_t
67 globus_l_notification_epr_get_reference_properties(
68 const wsa_EndpointReferenceType * epr,
69 char ** reference_properties);
70
71 static
72 globus_result_t
73 globus_l_notification_producer_find_subscription(
74 const wsa_EndpointReferenceType * subscription_epr,
75 globus_i_notification_producer_t ** producer,
76 globus_resource_t * subscription);
77
78
79 static
80 globus_result_t
81 globus_l_notification_subscription_create(
82 const globus_notification_producer_t
83 producer,
84 const globus_service_engine_t engine,
85 const wsnt_SubscribeType * Subscribe,
86 wsa_EndpointReferenceType * epr,
87 globus_resource_t * resource);
88
89 static
90 void
91 globus_l_notification_subscription_free(
92 void * arg);
93
94 static
95 void
96 globus_l_notify_done(
97 NotificationConsumerService_client_handle_t
98 handle,
99 void * callback_arg,
100 globus_result_t result);
101
102 static
103 void
104 globus_l_notification_callback_kickout(
105 void * user_arg);
106
107 static
108 time_t
109 globus_l_notification_timegm(
110 const struct tm * tm);
111
112 /* Local Variables */
113
114 #define WSN_NS "http://docs.oasis-open.org/wsn/b-2"
115
116 #define PAUSABLESUBSCRIPTIONMANAGERSERVICE_BASE_PATH \
117 "wsrf/services/PausableSubscriptionManagerService"
118
119 #define SIMPLE_TOPIC_EXPRESSION \
120 "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple"
121
122 #define CONCRETE_TOPIC_EXPRESSION \
123 "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Concrete"
124
125 #define FULL_TOPIC_EXPRESSION \
126 "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Full"
127
128 globus_mutex_t
129 globus_i_notification_producer_lock;
130
131 globus_hashtable_t
132 globus_i_notification_subscriptions;
133
134 globus_hashtable_t
135 globus_i_notification_producers;
136
137 static
138 globus_thread_once_t
139 globus_l_subcription_manager_activate_once = GLOBUS_THREAD_ONCE_INIT;
140
141 static
142 globus_service_descriptor_t *
143 globus_l_notification_SubscriptionManager_service_descriptor;
144
145 static
146 globus_extension_handle_t
147 globus_l_notification_SubscriptionManager_handle;
148
149 globus_module_descriptor_t
150 globus_i_notification_producer_module =
151 {
152 "globus_notification_producer",
153 globus_l_notification_producer_activate,
154 globus_l_notification_producer_deactivate,
155 globus_l_notification_producer_atexit,
156 NULL,
157 &local_version,
158 NULL
159 };
160
161 #endif /* GLOBUS_DONT_DOCUMENT_INTERNAL */
162
163 /**
164 * Create new notification producer state.
165 * @ingroup producer
166 *
167 * This function creates new notification producer state, addressed by the
168 * given @a resource_id string. This state may be manipulated to create new
169 * subscriptions and topics.
170 *
171 * @param resource_id
172 * Unique resource string which is used to identify this producer. This
173 * is typically related to the reference properties for a resource. This
174 * string value is required to identify the state in further notification
175 * calls.
176 * @param callback
177 * Callback function to be called whenever a topic is created,
178 * changed or destroyed.
179 * @param callback_arg
180 * Application-specific parameter to the callback function.
181 *
182 * @retval GLOBUS_SUCCESS
183 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
184 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
185 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_ALREADY_EXISTS
186 */
187 globus_result_t
188 globus_notification_producer_create(
189 const char * resource_id,
190 globus_notification_producer_topic_callback_t
191 callback,
192 void * callback_arg)
193 248 {
194 248 globus_result_t result = GLOBUS_SUCCESS;
195 int rc;
196 globus_notification_producer_t producer;
197
198 248 if (resource_id == NULL)
199 {
200 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
201
202 0 goto out;
203 }
204 248 GlobusNotificationProducerGlobalLock();
205
206 248 producer = globus_hashtable_lookup(
207 &globus_i_notification_producers,
208 (void *) resource_id);
209
210 248 if (producer != NULL)
211 {
212 /* Producer already exists. We'll just set the callback/callback_arg
213 * in it and be done
214 */
215 8 GlobusNotificationProducerLock(producer);
216 16 if (producer->callback == NULL && callback != NULL)
217 {
218 8 producer->callback = callback;
219 8 producer->callback_arg = callback_arg;
220
221 8 producer->callback(
222 producer->callback_arg,
223 producer->resource_id,
224 GLOBUS_NOTIFICATION_PRODUCER_TOPIC_CREATED,
225 NULL);
226 }
227 else
228 {
229 0 result = GLOBUS_NOTIFICATION_PRODUCER_ALREADY_EXISTS();
230 }
231 8 GlobusNotificationProducerUnlock(producer);
232 8 GlobusNotificationProducerGlobalUnlock();
233
234 8 goto out;
235 }
236
237 240 producer = calloc(1, sizeof(globus_i_notification_producer_t));
238
239 240 if (producer == NULL)
240 {
241 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
242
243 0 goto out;
244 }
245
246 240 producer->callback = callback;
247 240 producer->callback_arg = callback_arg;
248
249 240 producer->resource_id = globus_libc_strdup(resource_id);
250 240 if (producer->resource_id == NULL)
251 {
252 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
253
254 0 goto free_producer;
255 }
256
257 240 wstop_TopicNamespaceType_array_init(&producer->topic_set);
258
259 240 rc = globus_mutex_init(&producer->lock, NULL);
260 240 if (rc != GLOBUS_SUCCESS)
261 {
262 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
263
264 0 goto destroy_topic_set_out;
265 }
266
267 240 rc = globus_hashtable_insert(
268 &globus_i_notification_producers,
269 producer->resource_id,
270 producer);
271 240 if (rc != GLOBUS_SUCCESS)
272 {
273 0 result = GLOBUS_NOTIFICATION_PRODUCER_ALREADY_EXISTS();
274
275 0 goto unlock_out;
276 }
277
278 240 GlobusNotificationProducerGlobalUnlock();
279
280 240 return result;
281
282 0 unlock_out:
283 0 GlobusNotificationProducerGlobalUnlock();
284 0 destroy_topic_set_out:
285 0 wstop_TopicNamespaceType_array_destroy(producer->topic_set);
286 0 free(producer->resource_id);
287 0 free_producer:
288 0 free(producer);
289 8 out:
290 8 return result;
291 }
292 /* globus_notification_producer_create() */
293
294 /**
295 * Destroy notification producer state
296 * @ingroup producer
297 *
298 * This function destroys all producer state associated with a resource. All
299 * topics and subscriptions related to this producer are destroyed as well.
300 *
301 * @param resource_id
302 * Unique resource string which is used to identify this producer. This
303 * is typically related to the reference properties for a resource.
304 *
305 * @retval GLOBUS_SUCCESS
306 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
307 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
308 */
309 globus_result_t
310 globus_notification_producer_destroy(
311 const char * resource_id)
312 0 {
313 globus_notification_producer_t producer;
314 0 globus_result_t result = GLOBUS_SUCCESS;
315
316 0 if (resource_id == NULL)
317 {
318 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
319
320 0 goto out;
321 }
322
323 0 GlobusNotificationProducerGlobalLock();
324
325 0 producer = globus_hashtable_remove(
326 &globus_i_notification_producers,
327 (void *) resource_id);
328
329 0 if (producer == NULL)
330 {
331 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
332
333 0 goto unlock_error;
334 }
335
336 0 GlobusNotificationProducerLock(producer);
337 0 wstop_TopicNamespaceType_array_destroy(producer->topic_set);
338 0 GlobusNotificationProducerUnlock(producer);
339
340 0 globus_mutex_destroy(&producer->lock);
341 0 GlobusNotificationProducerGlobalUnlock();
342
343 0 free(producer->resource_id);
344
345 0 free(producer);
346
347 0 return GLOBUS_SUCCESS;
348
349 0 unlock_error:
350 0 GlobusNotificationProducerGlobalUnlock();
351 0 out:
352 0 return result;
353 }
354 /* globus_notification_producer_destroy() */
355
356 /**
357 * Create a Notification Topic
358 * @ingroup notification_topics
359 * Adds a new topic to the TopicSet for a particular notification producer
360 * resource. This new topic may be subscribed to by calling
361 * globus_notification_producer_subscribe(), and
362 * change notifications may be sent to subscribers by calling
363 * globus_notification_producer_topic_changed().
364 *
365 * @param resource_id
366 * Resource id associated with notification producer state.
367 * @param topic_expression
368 * An expression indicating the name (or path) of this topic. Currently
369 * only simple topic expressions are supported
370 * (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
371 *
372 * @retval GLOBUS_SUCCESS
373 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
374 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_DIALECT
375 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
376 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
377 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_ALREADY_EXISTS
378 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_INVALID_TOPIC
379 */
380 globus_result_t
381 globus_notification_producer_topic_create(
382 const char * resource_id,
383 const wsnt_TopicExpressionType * topic_expression)
384 1860 {
385 globus_notification_producer_t producer;
386 1860 globus_result_t result = GLOBUS_SUCCESS;
387
388 1860 if (resource_id == NULL || topic_expression == NULL)
389 {
390 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
391
392 0 goto out;
393 }
394
395 1860 if (strcmp(topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
396 strcmp(topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0)
397 {
398 0 result = GLOBUS_NOTIFICATION_PRODUCER_DIALECT_UNKNOWN();
399
400 0 goto out;
401 }
402
403 1860 GlobusNotificationProducerGlobalLock();
404
405 1860 producer = globus_hashtable_lookup(
406 &globus_i_notification_producers,
407 (void *) resource_id);
408
409 1860 if (producer)
410 {
411 1860 GlobusNotificationProducerLock(producer);
412 1860 GlobusNotificationProducerGlobalUnlock();
413 }
414 else
415 {
416 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
417
418 0 GlobusNotificationProducerGlobalUnlock();
419 0 goto out;
420 }
421
422 1860 result = globus_i_notification_topic_set_add(
423 producer->topic_set,
424 topic_expression);
425
426 1860 if (result != GLOBUS_SUCCESS)
427 {
428 0 goto unlock_producer_out;
429 }
430
431 1860 if (producer->callback)
432 {
433 1844 producer->callback(
434 producer->callback_arg,
435 producer->resource_id,
436 GLOBUS_NOTIFICATION_PRODUCER_TOPIC_CREATED,
437 topic_expression);
438 }
439
440 1860 unlock_producer_out:
441 1860 GlobusNotificationProducerUnlock(producer);
442
443 1860 out:
444 1860 return result;
445 }
446 /* globus_notification_producer_topic_create() */
447
448
449 /**
450 * Destroy a Notification Topic
451 * @ingroup notification_topics
452 * Destroys all information related to a previously existing topic associated
453 * with a notification producer. The topic and all of the related susbsription
454 * resources are removed from the service.
455 *
456 * @param resource_id
457 * Resource id associated with notification producer state.
458 * @param topic_expression
459 * An expression indicating the name (or path) of this topic. Currently
460 * only simple topic expressions are supported
461 * (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
462 *
463 * @retval GLOBUS_SUCCESS
464 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
465 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_DIALECT
466 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
467 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_TOPIC
468 */
469 globus_result_t
470 globus_notification_producer_topic_destroy(
471 const char * resource_id,
472 const wsnt_TopicExpressionType * topic_expression)
473 3 {
474 3 globus_result_t result = GLOBUS_SUCCESS;
475 globus_notification_producer_t producer;
476
477 3 if (resource_id == NULL || topic_expression == NULL)
478 {
479 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
480
481 0 goto out;
482 }
483 3 if (strcmp(topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
484 strcmp(topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0)
485 {
486 0 result = GLOBUS_NOTIFICATION_PRODUCER_DIALECT_UNKNOWN();
487
488 0 goto out;
489 }
490
491 3 GlobusNotificationProducerGlobalLock();
492
493 3 producer = globus_hashtable_lookup(
494 &globus_i_notification_producers,
495 (void *) resource_id);
496 3 if (producer)
497 {
498 3 GlobusNotificationProducerLock(producer);
499 }
500 else
501 {
502 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
503 0 GlobusNotificationProducerGlobalUnlock();
504
505 0 goto out;
506 }
507
508 3 result = globus_i_notification_topic_set_remove(
509 producer->topic_set,
510 topic_expression);
511
512 3 if (result != GLOBUS_SUCCESS)
513 {
514 0 goto unlock_out;
515 }
516 3 if (producer->callback)
517 {
518 3 producer->callback(
519 producer->callback_arg,
520 producer->resource_id,
521 GLOBUS_NOTIFICATION_PRODUCER_TOPIC_DESTROYED,
522 topic_expression);
523 }
524
525 3 unlock_out:
526 3 GlobusNotificationProducerGlobalUnlock();
527 3 GlobusNotificationProducerUnlock(producer);
528 3 out:
529 3 return result;
530 }
531 /* globus_notification_producer_topic_destroy() */
532
533 /**
534 * Subscribe to a topic handled by a notification producer.
535 * @ingroup subscription
536 *
537 * @param resource_id
538 * Resource id associated with notification producer state.
539 * @param engine
540 * A pointer to the WS engine which will serve requests for operations
541 * on the resulting NotificationManager subscription
542 * @param Subscribe
543 * Parameters related to the subscription. These are generally passed
544 * as parameters to the WSN Subscribe operation, but may be generated
545 * programmatically by a service. Currently only the @a ConsumerReference,
546 * @a TopicExpression, and @a UseNotify elements are handled. All others
547 * are ignored by this implementation.
548 * @param SubscribeResponse
549 * Pointer to a response structure which will be populated by this
550 * implementation. The contents of this structure will be initialized and
551 * set if the subscription is created successfully.
552 *
553 * @retval GLOBUS_SUCCESS
554 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
555 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
556 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_UNKNOWN_TOPIC
557 */
558 globus_result_t
559 globus_notification_producer_subscribe(
560 const char * resource_id,
561 const globus_service_engine_t engine,
562 const wsnt_SubscribeType * Subscribe,
563 wsnt_SubscribeResponseType * SubscribeResponse)
564 35 {
565 globus_resource_t subscription_resource;
566 35 globus_result_t result = GLOBUS_SUCCESS;
567 globus_notification_producer_t producer;
568 35 globus_list_t * topic_list = NULL;
569 globus_list_t * tmp;
570 globus_i_notification_producer_topic_t *
571 topic;
572 globus_i_notification_subscription_t *
573 subscription;
574 globus_i_notification_subscription_entry_t *
575 subscription_entry;
576 wsa_EndpointReferenceType * tmp_epr;
577 35 wsnt_TopicExpressionType * topic_expression = NULL;
578 int rc;
579 int i;
580
581 /* Make sure all parameters make sense */
582 35 if (resource_id == NULL || Subscribe == NULL
583 || Subscribe->Filter == NULL || SubscribeResponse == NULL)
584 {
585 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
586
587 0 goto out;
588 }
589
590 /* Validate subscription Filter components */
591 134 for (i = 0; i < Subscribe->Filter->any.length; i++)
592 {
593 35 if (xsd_QName_keyeq(Subscribe->Filter->any.elements[i].element,
594 &wsnt_TopicExpression_qname))
595 {
596 35 topic_expression = Subscribe->Filter->any.elements[i].value;
597
598 35 if (strcmp(topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
599 strcmp(topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0 &&
600 strcmp(topic_expression->_Dialect, FULL_TOPIC_EXPRESSION) != 0)
601 {
602 3 result = GLOBUS_NOTIFICATION_PRODUCER_DIALECT_UNKNOWN();
603
604 3 goto out;
605 }
606 32 topic_expression = Subscribe->Filter->any.elements[i].value;
607
608 32 if (Subscribe->Filter->any.elements[i].any_info !=
609 &wsnt_TopicExpressionType_info)
610 {
611 0 result =
612 GLOBUS_NOTIFICATION_PRODUCER_INVALID_TOPIC_EXPRESSION();
613 }
614 }
615 0 else if (xsd_QName_keyeq(Subscribe->Filter->any.elements[i].element,
616 &wsnt_ProducerProperties_qname))
617 {
618 0 result = GLOBUS_NOTIFICATION_PRODUCER_INVALID_FILTER(
619 &wsnt_ProducerProperties_qname);
620
621 0 goto out;
622 }
623 0 else if (xsd_QName_keyeq(Subscribe->Filter->any.elements[i].element,
624 &wsnt_MessageContent_qname))
625 {
626 0 result = GLOBUS_NOTIFICATION_PRODUCER_INVALID_FILTER(
627 &wsnt_MessageContent_qname);
628
629 0 goto out;
630 }
631 0 else if (Subscribe->Filter->any.elements[i].element != NULL)
632 {
633 0 result = GLOBUS_NOTIFICATION_PRODUCER_INVALID_FILTER(
634 Subscribe->Filter->any.elements[i].element);
635
636 0 goto out;
637 }
638 else
639 {
640 0 result = GLOBUS_NOTIFICATION_PRODUCER_SUBSCRIPTION_CREATION_FAILED();
641
642 0 goto out;
643 }
644 }
645 32 if (topic_expression == NULL)
646 {
647 0 result = GLOBUS_NOTIFICATION_PRODUCER_SUBSCRIPTION_CREATION_FAILED();
648
649 0 goto out;
650 }
651
652 32 GlobusNotificationProducerGlobalLock();
653
654 32 producer = globus_hashtable_lookup(
655 &globus_i_notification_producers,
656 (void *) resource_id);
657
658 32 if (producer == NULL)
659 {
660 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
661
662 0 GlobusNotificationProducerGlobalUnlock();
663
664 0 goto out;
665 }
666
667 32 GlobusNotificationProducerLock(producer);
668 32 GlobusNotificationProducerGlobalUnlock();
669
670 32 result = globus_i_notification_topic_set_lookup(
671 producer->topic_set,
672 topic_expression,
673 &topic_list);
674
675 32 if (topic_list == NULL)
676 {
677 3 if (result == GLOBUS_SUCCESS)
678 {
679 0 result = GLOBUS_NOTIFICATION_PRODUCER_TOPIC_NOT_SUPPORTED();
680 }
681 3 goto unlock_out;
682 }
683
684 /* Internal data containing info about this subscription */
685 29 subscription = malloc(
686 sizeof(globus_i_notification_subscription_t));
687 29 if (subscription == NULL)
688 {
689 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
690
691 0 goto free_topic_list_out;
692 }
693
694 29 result = wsa_EndpointReferenceType_copy(
695 &subscription->consumer_epr,
696 &Subscribe->ConsumerReference);
697 29 if (result != GLOBUS_SUCCESS)
698 {
699 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
700
701 0 goto free_subscription_out;
702 }
703 29 subscription->paused = GLOBUS_FALSE;
704
705 29 result = wsnt_SubscribeResponseType_init_contents(SubscribeResponse);
706
707 29 if (result != GLOBUS_SUCCESS)
708 {
709 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
710
711 0 goto free_subscription_consumer_out;
712 }
713
714 /* Create resource associated with this subscription */
715 29 result = globus_l_notification_subscription_create(
716 producer,
717 engine,
718 Subscribe,
719 &SubscribeResponse->SubscriptionReference,
720 &subscription_resource);
721
722 29 if (result != GLOBUS_SUCCESS)
723 {
724 0 goto free_subscription_epr_out;
725 }
726
727 29 result = globus_resource_set_resource_specific(
728 subscription_resource,
729 subscription,
730 globus_l_notification_subscription_free);
731 29 if (result != GLOBUS_SUCCESS)
732 {
733 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
734
735 0 goto destroy_subscription_resource_out;
736 }
737
738 29 subscription_entry =
739 malloc(sizeof(globus_i_notification_subscription_entry_t));
740 29 if (subscription_entry == NULL)
741 {
742 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
743 0 goto destroy_subscription_resource_out;
744 }
745 29 subscription_entry->producer = producer;
746 29 result = wsa_EndpointReferenceType_copy(
747 &subscription_entry->epr,
748 &SubscribeResponse->SubscriptionReference);
749
750 29 if (result != GLOBUS_SUCCESS)
751 {
752 0 goto free_subscription_entry_out;
753 }
754 29 subscription->topics = NULL;
755
756 /* Add mapping of subscription to topics and vice versa */
757 29 tmp = topic_list;
758 87 while (!globus_list_empty(tmp))
759 {
760 29 topic = globus_list_first(tmp);
761
762 29 if (topic->topic.any->any_info ==
763 &wstop_SimpleTopicExpression_contents_info)
764 {
765 20 xsd_QName *qn = topic->topic.any->value;
766 }
767 else
768 {
769 9 xsd_token *tok = topic->topic.any->value;
770 }
771
772 /* Insert topic pointer into subscription's state */
773 29 globus_list_insert(&subscription->topics, topic);
774
775 /* Add reference to subscription's EPR to the topic */
776 29 tmp_epr = wsa_EndpointReferenceType_array_push(&topic->subscriptions);
777 29 if (tmp_epr == NULL)
778 {
779 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
780
781 0 goto remove_from_topic_list_out;
782 }
783 /* Make a copy of the EPR for usage within this API. The SubscribeResponse
784 * will be freed by the service engine after it is sent as part of the
785 * SOAP response.
786 */
787 29 result = wsa_EndpointReferenceType_copy_contents(
788 tmp_epr,
789 &SubscribeResponse->SubscriptionReference);
790 29 if (result != GLOBUS_SUCCESS)
791 {
792 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
793
794 0 goto remove_from_topic_list_out;
795 }
796 29 tmp = globus_list_rest(tmp);
797 }
798 29 GlobusNotificationProducerUnlock(producer);
799
800 29 GlobusNotificationProducerGlobalLock();
801
802 /* Add entry in the map of subscription EPRs to producer structures */
803 29 rc = globus_hashtable_insert(
804 &globus_i_notification_subscriptions,
805 subscription_entry->epr,
806 subscription_entry);
807 29 GlobusNotificationProducerGlobalUnlock();
808
809 29 if (rc != GLOBUS_SUCCESS)
810 {
811 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
812
813 0 goto remove_from_topic_list_out;
814 }
815 29 if (topic_list)
816 {
817 29 globus_list_free(topic_list);
818 }
819
820 29 globus_resource_finish(subscription_resource);
821
822 29 return GLOBUS_SUCCESS;
823
824 0 remove_from_topic_list_out:
825 0 for (i = 0; i < topic->subscriptions.length; i++)
826 {
827 0 if (globus_l_notification_epr_keyequal(
828 &topic->subscriptions.elements[i],
829 subscription_entry->epr))
830 {
831 0 wsa_EndpointReferenceType_destroy_contents(&topic->subscriptions.elements[i]);
832 0 memmove(&topic->subscriptions.elements[i],
833 &topic->subscriptions.elements[i+1],
834 sizeof(wsa_EndpointReferenceType) *
835 (topic->subscriptions.length - (i+1)));
836 0 topic->subscriptions.length--;
837 0 break;
838 }
839 }
840 0 globus_list_free(subscription->topics);
841 0 wsa_EndpointReferenceType_destroy(subscription_entry->epr);
842 0 free_subscription_entry_out:
843 0 free(subscription_entry);
844 0 destroy_subscription_resource_out:
845 0 globus_resource_destroy(subscription_resource);
846 0 free_subscription_epr_out:
847 0 wsa_EndpointReferenceType_destroy_contents(
848 &SubscribeResponse->SubscriptionReference);
849 0 free_subscription_consumer_out:
850 0 wsa_EndpointReferenceType_destroy(subscription->consumer_epr);
851 0 free_subscription_out:
852 0 free(subscription);
853 0 free_topic_list_out:
854 0 globus_list_free(topic_list);
855 3 unlock_out:
856 3 GlobusNotificationProducerUnlock(producer);
857 6 out:
858 6 return result;
859 }
860 /* globus_notification_producer_subscribe() */
861
862 /**
863 * Destroy a subscription resource.
864 * @ingroup subscription
865 *
866 * Destroys the SubscriptionManager resource at the named location, as well
867 * as freeing all memory associated with the subscription. Subscriptions
868 * which are destroyed will not have any additional notification sent to them,
869 * though an in-progress notification may arrive after the subscription is
870 * destroyed.
871 *
872 * When the topic or notification producer state which was used to create the
873 * subscription is destroyed, the subscription will automatically be destroyed.
874 *
875 * @param subscription_epr
876 * Pointer to the endpoint reference to the SubscriptionManager resource.
877 *
878 * @retval GLOBUS_SUCCESS
879 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
880 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
881 */
882 globus_result_t
883 globus_notification_producer_destroy_subscription(
884 const wsa_EndpointReferenceType * subscription_epr)
885 0 {
886 globus_result_t result;
887 globus_notification_producer_t producer;
888 globus_resource_t resource;
889 globus_i_notification_producer_topic_t *
890 topic;
891 globus_i_notification_subscription_t *
892 subscription;
893 globus_i_notification_subscription_entry_t *
894 subscription_entry;
895 int i;
896 globus_list_t * tmp;
897
898 0 result = globus_l_notification_producer_find_subscription(
899 subscription_epr,
900 &producer,
901 &resource);
902
903 0 if (result != GLOBUS_SUCCESS)
904 {
905 0 goto out;
906 }
907
908 0 subscription_entry = globus_hashtable_remove(
909 &globus_i_notification_subscriptions,
910 (void *) subscription_epr);
911
912 0 wsa_EndpointReferenceType_destroy(subscription_entry->epr);
913 0 free(subscription_entry);
914
915 0 result = globus_resource_get_resource_specific(
916 resource,
917 (void **) &subscription);
918
919 0 if (result != GLOBUS_SUCCESS || subscription == NULL)
920 {
921 /* Should NEVER happen */
922 0 globus_assert(result == GLOBUS_SUCCESS);
923 0 globus_assert(subscription != GLOBUS_NULL);
924
925 /* We fudge a little bit here: the destroy is essentially done at
926 * this point, but for some things which shouldn't happen. From the
927 * application's point of view, the destruction was successful unless
928 * this is built with debug :)
929 */
930 0 result = GLOBUS_SUCCESS;
931
932 0 goto unlock_out;
933 }
934
935 /* Make sure the subscription is removed from all topics it was associated with */
936 0 tmp = subscription->topics;
937 0 while (tmp != NULL)
938 {
939 0 topic = globus_list_first(tmp);
940 0 for (i = 0; i < topic->subscriptions.length; i++)
941 {
942 0 if (globus_l_notification_epr_keyequal(
943 &topic->subscriptions.elements[i],
944 (void*) subscription_epr))
945 {
946 0 wsa_EndpointReferenceType_destroy_contents(
947 &topic->subscriptions.elements[i]);
948 0 memmove(&topic->subscriptions.elements[i],
949 &topic->subscriptions.elements[i+1],
950 sizeof(wsa_EndpointReferenceType) *
951 (topic->subscriptions.length - (i+1)));
952 0 topic->subscriptions.length--;
953 }
954 }
955 0 tmp = globus_list_rest(tmp);
956 }
957
958 0 unlock_out:
959 0 globus_resource_destroy(resource);
960 0 GlobusNotificationProducerUnlock(producer);
961 0 out:
962 0 return result;
963 }
964 /* globus_notification_producer_destroy_subscription() */
965
966 /**
967 * Get Current Message
968 * @ingroup notification_topics
969 *
970 * Returns the last message change sent via a call to
971 * globus_notification_producer_topic_changed() for a particular topic
972 * associated with a producer resource.
973 *
974 * @param resource_id
975 * Resource id associated with notification producer state.
976 * @param topic_expression
977 * An expression indicating the name (or path) of this topic. Currently
978 * only simple topic expressions are supported
979 * (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
980 * @param any
981 * A pointer to an xsd:any which will be populated with a copy of the last
982 * message sent for the given @a topic_expression. If no changes have been
983 * sent for this topic, then the returned @a any will not contain any data.
984 *
985 * @retval GLOBUS_SUCCESS
986 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
987 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_DIALECT_UNKNOWN
988 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_TOPIC
989 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
990 */
991 globus_result_t
992 globus_notification_producer_get_current_message(
993 const char * resource_id,
994 const wsnt_TopicExpressionType * topic_expression,
995 xsd_any * any)
996 30 {
997 30 globus_result_t result = GLOBUS_SUCCESS;
998 globus_notification_producer_t producer;
999 30 globus_list_t * topic_list = NULL;
1000 globus_list_t * tmp_list;
1001 globus_i_notification_producer_topic_t *
1002 topic;
1003 xsd_any_array * aa;
1004 xsd_any * new_any;
1005
1006 30 if (any == NULL || resource_id == NULL || topic_expression == NULL)
1007 {
1008 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1009
1010 0 goto out;
1011 }
1012 30 result = xsd_any_init_contents(any);
1013
1014 30 if (strcmp(topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
1015 strcmp(topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0 &&
1016 strcmp(topic_expression->_Dialect, FULL_TOPIC_EXPRESSION) != 0)
1017 {
1018 3 result = GLOBUS_NOTIFICATION_PRODUCER_DIALECT_UNKNOWN();
1019
1020 3 goto out;
1021 }
1022
1023 27 GlobusNotificationProducerGlobalLock();
1024
1025 27 producer = globus_hashtable_lookup(
1026 &globus_i_notification_producers,
1027 (void *) resource_id);
1028
1029 27 if (producer == NULL)
1030 {
1031 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1032
1033 0 GlobusNotificationProducerGlobalUnlock();
1034
1035 0 goto out;
1036 }
1037
1038 27 GlobusNotificationProducerLock(producer);
1039 27 GlobusNotificationProducerGlobalUnlock();
1040
1041 27 result = globus_i_notification_topic_set_lookup(
1042 producer->topic_set,
1043 topic_expression,
1044 &topic_list);
1045
1046 27 if (topic_list == NULL)
1047 {
1048 3 goto unlock_out;
1049 }
1050
1051 24 if (result != GLOBUS_SUCCESS)
1052 {
1053 0 goto unlock_out;
1054 }
1055
1056 24 if (globus_list_size(topic_list) > 1)
1057 {
1058 0 result = xsd_any_array_init(&aa);
1059 0 any->value = aa;
1060 0 any->any_info = &xsd_any_array_info;
1061
1062 0 tmp_list = topic_list;
1063 0 while (!globus_list_empty(tmp_list))
1064 {
1065 0 topic = globus_list_first(tmp_list);
1066 0 if (topic->current_message != NULL && topic->current_message->value != NULL)
1067 {
1068 0 new_any = xsd_any_array_push(aa);
1069 0 if (new_any == NULL)
1070 {
1071 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1072 0 goto destroy_any_out;
1073 }
1074 0 new_any->any_info = topic->current_message->any_info;
1075 0 result = new_any->any_info->copy(
1076 (void **) &new_any->value,
1077 topic->current_message->value);
1078 0 if (result != GLOBUS_SUCCESS)
1079 {
1080 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1081
1082 0 goto destroy_any_out;
1083 }
1084 }
1085 0 tmp_list = globus_list_rest(tmp_list);
1086 }
1087 }
1088 else
1089 {
1090 24 topic = globus_list_first(topic_list);
1091 24 if (topic->current_message != NULL && topic->current_message->value != NULL)
1092 {
1093 21 any->any_info = topic->current_message->any_info;
1094 21 result = any->any_info->copy(
1095 (void **) &any->value,
1096 topic->current_message->value);
1097 21 if (result != GLOBUS_SUCCESS)
1098 {
1099 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1100
1101 0 goto destroy_any_out;
1102 }
1103 }
1104 }
1105
1106 24 globus_list_free(topic_list);
1107
1108 24 destroy_any_out:
1109 24 if (result != GLOBUS_SUCCESS)
1110 {
1111 0 xsd_any_destroy(any);
1112 }
1113 27 unlock_out:
1114 27 GlobusNotificationProducerUnlock(producer);
1115 30 out:
1116 30 return result;
1117 }
1118 /* globus_notification_producer_get_current_message() */
1119
1120 /**
1121 * Send Topic Change Notifications
1122 * @ingroup notification_topics
1123 *
1124 * For each subscription to the given topic_expression for the given resource,
1125 * the Notify operation will be invoked on the ConsumerReference associated
1126 * with the subscription. When all notifications have been sent, the @a
1127 * notification_done callback will be invoked locally.
1128 *
1129 * The @a message will be copied by the library when the notification begins,
1130 * so the caller may free the value immediately upon returning from this
1131 * function.
1132 *
1133 * @param resource_id
1134 * Resource id associated with notification producer state.
1135 * @param topic_expression
1136 * An expression indicating the name (or path) of this topic. Currently
1137 * only simple topic expressions are supported
1138 * (http://docs.oasis-open.org/wsn/2004/06/TopicExpression/Simple)
1139 * @param message
1140 * Contents of the notification message.
1141 * @param notification_done
1142 * Optional done callback. If this is non-NULL, then the callback will
1143 * be called when all of the notification operations have completed, for
1144 * implementations which would like to enforce sequencing notifications.
1145 * @param callback_arg
1146 * Optional parameter to the @a notification_done callback.
1147 *
1148 * @retval GLOBUS_SUCCESS
1149 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1150 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
1151 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_TOPIC
1152 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
1153 */
1154 globus_result_t
1155 globus_notification_producer_topic_changed(
1156 const char * resource_id,
1157 const wsnt_TopicExpressionType * topic_expression,
1158 const xsd_any * message,
1159 globus_notification_done_callback_t notification_done,
1160 void * callback_arg)
1161 433 {
1162 globus_result_t result;
1163 globus_notification_producer_t producer;
1164 globus_list_t * topic_list;
1165 globus_i_notification_producer_topic_t *
1166 topic;
1167 globus_i_notification_callback_info_t *
1168 callback_info;
1169 int i;
1170 NotificationConsumerService_client_handle_t
1171 handle;
1172 wsa_EndpointReferenceType * subscription_epr;
1173 globus_resource_t resource;
1174 globus_i_notification_subscription_t *
1175 subscription;
1176 wsnt_NotificationMessageHolderType *notify_message;
1177 433 globus_bool_t callback_registered = GLOBUS_FALSE;
1178
1179 433 if (resource_id == NULL || topic_expression == NULL || message == NULL)
1180 {
1181 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1182
1183 0 goto out;
1184 }
1185
1186 433 if (strcmp(topic_expression->_Dialect, SIMPLE_TOPIC_EXPRESSION) != 0 &&
1187 strcmp(topic_expression->_Dialect, CONCRETE_TOPIC_EXPRESSION) != 0)
1188 {
1189 0 result = GLOBUS_NOTIFICATION_PRODUCER_DIALECT_UNKNOWN();
1190
1191 0 goto out;
1192 }
1193
1194 433 GlobusNotificationProducerGlobalLock();
1195 433 producer = globus_hashtable_lookup(
1196 &globus_i_notification_producers,
1197 (void *) resource_id);
1198
1199 433 if (producer == NULL)
1200 {
1201 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1202
1203 0 goto unlock_notification_out;
1204 }
1205
1206 433 GlobusNotificationProducerLock(producer);
1207
1208 433 result = globus_i_notification_topic_set_lookup(
1209 producer->topic_set,
1210 topic_expression,
1211 &topic_list);
1212
1213 433 if (topic_list == NULL)
1214 {
1215 0 goto unlock_out;
1216 }
1217 433 topic = globus_list_first(topic_list);
1218 433 globus_list_free(topic_list);
1219
1220 433 xsd_any_destroy(topic->current_message);
1221 433 topic->current_message = NULL;
1222
1223 /* non-fatal if this fails, so we'll ignore result */
1224 433 (void) xsd_any_copy(&topic->current_message, message);
1225
1226
1227 433 callback_info = malloc(
1228 sizeof(globus_i_notification_callback_info_t));
1229 433 if (callback_info == NULL)
1230 {
1231 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1232
1233 0 goto unlock_out;
1234 }
1235 433 callback_info->callback = notification_done;
1236 433 callback_info->callback_arg = callback_arg;
1237 433 callback_info->topic_expression = topic_expression;
1238 433 callback_info->notification_count = 0;
1239 433 result = wsnt_NotifyType_init(&callback_info->Notify);
1240
1241 433 if (result != GLOBUS_SUCCESS)
1242 {
1243 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1244
1245 0 goto free_callback_info_out;
1246 }
1247
1248 433 notify_message = wsnt_NotificationMessageHolderType_array_push(
1249 &callback_info->Notify->NotificationMessage);
1250
1251 433 if (notify_message == NULL)
1252 {
1253 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1254
1255 0 goto free_notify_out;
1256 }
1257
1258 433 result = wsnt_TopicExpressionType_copy(
1259 &notify_message->Topic,
1260 topic_expression);
1261 433 if (result != GLOBUS_SUCCESS)
1262 {
1263 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1264
1265 0 goto free_notify_out;
1266 }
1267 433 notify_message->ProducerReference = NULL;
1268
1269 433 result = xsd_any_copy_contents(
1270 &notify_message->Message.any,
1271 message);
1272 433 if (result != GLOBUS_SUCCESS)
1273 {
1274 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1275
1276 0 goto free_notify_out;
1277 }
1278
1279 433 globus_mutex_init(&callback_info->lock, NULL);
1280 433 globus_mutex_lock(&callback_info->lock);
1281
1282 433 if (topic->subscriptions.length == 0)
1283 {
1284 /* no subscriptions at all, if we have a callback to call, we'll
1285 * register a oneshot to handle it, otherwise clean up the callback
1286 * info.
1287 */
1288 356 if (notification_done != NULL)
1289 {
1290 0 result = globus_callback_register_oneshot(
1291 NULL,
1292 &globus_i_reltime_zero,
1293 globus_l_notification_callback_kickout,
1294 callback_info);
1295 0 if (result == GLOBUS_SUCCESS)
1296 {
1297 0 callback_registered = GLOBUS_TRUE;
1298 }
1299 else
1300 {
1301 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1302 }
1303 }
1304 356 goto unlock_callback_info_out;
1305 }
1306
1307 157 for (i = 0; i < topic->subscriptions.length; i++)
1308 {
1309 80 subscription_epr = &topic->subscriptions.elements[i];
1310
1311 80 result = globus_wsrf_core_get_resource_from_epr(
1312 subscription_epr,
1313 &resource);
1314
1315 80 if (result != GLOBUS_SUCCESS)
1316 {
1317 /* skip dead subscription for this topic */
1318 0 continue;
1319 }
1320
1321 80 result = globus_resource_get_resource_specific(
1322 resource,
1323 (void **) &subscription);
1324
1325 80 if (result != GLOBUS_SUCCESS || subscription == NULL)
1326 {
1327 /* skip broken subscription for this topic */
1328 0 globus_resource_finish(resource);
1329
1330 0 continue;
1331 }
1332
1333 80 if (subscription->paused)
1334 {
1335 /* skip paused subscription */
1336 0 globus_resource_finish(resource);
1337
1338 0 continue;
1339 }
1340
1341 80 result = NotificationConsumerService_client_init(
1342 &handle,
1343 NULL,
1344 NULL);
1345
1346 80 if (result != GLOBUS_SUCCESS)
1347 {
1348 0 globus_resource_finish(resource);
1349
1350 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1351
1352 0 goto unlock_callback_info_out;
1353 }
1354
1355 80 result = Consumer_Notify_epr_register_request(
1356 handle,
1357 subscription->consumer_epr,
1358 callback_info->Notify,
1359 globus_l_notify_done,
1360 callback_info);
1361 80 globus_resource_finish(resource);
1362 80 if (result == GLOBUS_SUCCESS)
1363 {
1364 80 callback_info->notification_count++;
1365 80 callback_registered = GLOBUS_TRUE;
1366 }
1367 else
1368 {
1369 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1370
1371 0 goto unlock_callback_info_out;
1372 }
1373 }
1374
1375 77 if (producer->callback)
1376 {
1377 77 producer->callback(
1378 producer->callback_arg,
1379 producer->resource_id,
1380 GLOBUS_NOTIFICATION_PRODUCER_TOPIC_CHANGED,
1381 topic_expression);
1382 }
1383
1384 433 unlock_callback_info_out:
1385 433 globus_mutex_unlock(&callback_info->lock);
1386 433 free_notify_out:
1387 433 if (!callback_registered)
1388 {
1389 356 wsnt_NotifyType_destroy(callback_info->Notify);
1390 356 free_callback_info_out:
1391 356 free(callback_info);
1392 }
1393 433 unlock_out:
1394 433 GlobusNotificationProducerUnlock(producer);
1395 433 unlock_notification_out:
1396 433 GlobusNotificationProducerGlobalUnlock();
1397 433 out:
1398 433 return result;
1399 }
1400 /* globus_notification_producer_topic_changed() */
1401
1402
1403 /**
1404 * Pause a Subscription
1405 * @ingroup subscription
1406 * Disable sending any notification messages to the consumer associated with
1407 * a subscription resource until the subscription is resumed by calling
1408 * globus_notification_subscription_resume().
1409 *
1410 * @param subscription_manager
1411 * Pointer to the endpoint reference to the SubscriptionManager resource.
1412 *
1413 * @retval GLOBUS_SUCCESS
1414 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1415 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
1416 */
1417 globus_result_t
1418 globus_notification_subscription_pause(
1419 const wsa_EndpointReferenceType * subscription_manager)
1420 0 {
1421 globus_result_t result;
1422 globus_notification_producer_t producer;
1423 globus_resource_t resource;
1424 globus_i_notification_subscription_t *
1425 subscription;
1426
1427 0 if (subscription_manager == NULL)
1428 {
1429 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1430
1431 0 goto out;
1432 }
1433 0 result = globus_l_notification_producer_find_subscription(
1434 subscription_manager,
1435 &producer,
1436 &resource);
1437
1438 0 if (result != GLOBUS_SUCCESS)
1439 {
1440 0 goto out;
1441 }
1442
1443 0 result = globus_resource_get_resource_specific(
1444 resource,
1445 (void **) &subscription);
1446
1447 0 if (result != GLOBUS_SUCCESS)
1448 {
1449 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1450
1451 0 goto unlock_out;
1452 }
1453
1454 0 subscription->paused = GLOBUS_TRUE;
1455
1456 0 unlock_out:
1457 0 globus_resource_finish(resource);
1458 0 GlobusNotificationProducerUnlock(producer);
1459 0 out:
1460 0 return result;
1461 }
1462 /* globus_notification_subscription_pause() */
1463
1464 /**
1465 * Resume a Subscription
1466 * @ingroup subscription
1467 * Reenable sending notification messages to the consumer associated with
1468 * a subscription resource which was paused by by calling
1469 * globus_notification_subscription_pause().
1470 *
1471 * @param subscription_manager
1472 * Pointer to the endpoint reference to the SubscriptionManager resource.
1473 *
1474 * @retval GLOBUS_SUCCESS
1475 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1476 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_UNKNOWN_RESOURCE
1477 */
1478 globus_result_t
1479 globus_notification_subscription_resume(
1480 const wsa_EndpointReferenceType * subscription_manager)
1481 0 {
1482 globus_result_t result;
1483 globus_notification_producer_t producer;
1484 globus_resource_t resource;
1485 globus_i_notification_subscription_t *
1486 subscription;
1487
1488 0 if (subscription_manager == NULL)
1489 {
1490 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1491
1492 0 goto out;
1493 }
1494 0 result = globus_l_notification_producer_find_subscription(
1495 subscription_manager,
1496 &producer,
1497 &resource);
1498
1499 0 if (result != GLOBUS_SUCCESS)
1500 {
1501 0 goto out;
1502 }
1503
1504 0 result = globus_resource_get_resource_specific(
1505 resource,
1506 (void **) &subscription);
1507
1508 0 if (result != GLOBUS_SUCCESS)
1509 {
1510 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1511
1512 0 goto unlock_out;
1513 }
1514
1515 0 subscription->paused = GLOBUS_FALSE;
1516
1517 0 unlock_out:
1518 0 globus_resource_finish(resource);
1519 0 GlobusNotificationProducerUnlock(producer);
1520 0 out:
1521 0 return result;
1522 }
1523 /* globus_notification_subscription_resume() */
1524
1525 /**
1526 * Initialize a topic expression
1527 * @ingroup notification_topics
1528 * Initializes a topic expression to use the SimpleTopicExpression dialect
1529 * with the QName as the topic expression. The topic_expression can then
1530 * be used for creating topics or subscriptions.
1531 *
1532 * @param topic_expression
1533 * Pointer to a @a wsnt_TopicExpressionType which will have its
1534 * contents initialized to the value of @a expression.
1535 * @param expression
1536 * QName describing the topic.
1537 */
1538 globus_result_t
1539 globus_notification_producer_set_simple_topic_expression_contents(
1540 wsnt_TopicExpressionType * topic_expression,
1541 const xsd_QName * expression)
1542 100 {
1543 xsd_string tmp;
1544 globus_result_t result;
1545
1546 100 result = wsnt_TopicExpressionType_init_contents(topic_expression);
1547 100 if (result != GLOBUS_SUCCESS)
1548 {
1549 0 goto out;
1550 }
1551 100 tmp = globus_libc_strdup(SIMPLE_TOPIC_EXPRESSION);
1552
1553 100 if (tmp == NULL)
1554 {
1555 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1556
1557 0 goto destroy_topic_expression_contents_out;
1558 }
1559 100 result = xsd_string_init_contents_cstr(
1560 &topic_expression->_Dialect,
1561 tmp);
1562 100 if (result != GLOBUS_SUCCESS)
1563 {
1564 0 goto free_tmp_out;
1565 }
1566 100 tmp = NULL;
1567 100 result = xsd_any_init(&topic_expression->any);
1568 100 if (result != GLOBUS_SUCCESS)
1569 {
1570 0 goto destroy_topic_expression_contents_out;
1571 }
1572 100 topic_expression->any->any_info =
1573 &wstop_SimpleTopicExpression_contents_info;
1574 100 result = wstop_SimpleTopicExpression_copy(
1575 (wstop_SimpleTopicExpression **) &topic_expression->any->value,
1576 expression);
1577
1578 100 if (result != GLOBUS_SUCCESS)
1579 {
1580 goto destroy_topic_expression_contents_out;
1581
1582 0 destroy_topic_expression_contents_out:
1583 0 wsnt_TopicExpressionType_destroy_contents(topic_expression);
1584 }
1585 100 free_tmp_out:
1586 100 if (tmp != NULL)
1587 {
1588 0 free(tmp);
1589 }
1590 100 out:
1591 100 return result;
1592 }
1593 /* globus_notification_producer_set_simple_topic_expression_contents() */
1594
1595 #ifndef GLOBUS_DONT_DOCUMENT_INTERNAL
1596 static
1597 int
1598 globus_l_notification_producer_activate(void)
1599 271 {
1600 int rc;
1601
1602 271 rc = globus_module_activate(GLOBUS_COMMON_MODULE);
1603
1604 271 if (rc != GLOBUS_SUCCESS)
1605 {
1606 0 goto error_out;
1607 }
1608 271 GlobusDebugInit(GLOBUS_NOTIFICATION_PRODUCER, DEBUG INFO TRACE WARN ERROR);
1609
1610 271 rc = globus_module_activate(GLOBUS_WSRF_RESOURCE_MODULE);
1611 271 if (rc != GLOBUS_SUCCESS)
1612 {
1613 0 goto deactivate_common_out;
1614 }
1615 271 rc = globus_module_activate(NOTIFICATIONCONSUMERSERVICE_MODULE);
1616 271 if (rc != GLOBUS_SUCCESS)
1617 {
1618 0 goto deactivate_wsrf_resource_out;
1619 }
1620
1621 271 globus_mutex_init(&globus_i_notification_producer_lock, NULL);
1622
1623 271 rc = globus_hashtable_init(
1624 &globus_i_notification_subscriptions,
1625 15,
1626 globus_l_notification_epr_hash,
1627 globus_l_notification_epr_keyequal);
1628
1629 271 if (rc != GLOBUS_SUCCESS)
1630 {
1631 0 goto deactivate_consumerservice_out;
1632 }
1633
1634 271 rc = globus_hashtable_init(
1635 &globus_i_notification_producers,
1636 15,
1637 globus_hashtable_string_hash,
1638 globus_hashtable_string_keyeq);
1639
1640 271 if (rc != GLOBUS_SUCCESS)
1641 {
1642 0 goto destroy_subscriptions_out;
1643
1644 }
1645
1646 271 globus_l_notification_SubscriptionManager_handle = NULL;
1647 271 globus_l_notification_SubscriptionManager_service_descriptor = NULL;
1648
1649 271 return GLOBUS_SUCCESS;
1650 0 destroy_subscriptions_out:
1651 0 globus_hashtable_destroy(&globus_i_notification_subscriptions);
1652 0 deactivate_consumerservice_out:
1653 0 globus_module_deactivate(NOTIFICATIONCONSUMERSERVICE_MODULE);
1654 0 deactivate_wsrf_resource_out:
1655 0 globus_module_deactivate(GLOBUS_WSRF_RESOURCE_MODULE);
1656 0 deactivate_common_out:
1657 0 GlobusDebugDestroy(GLOBUS_NOTIFICATION_PRODUCER);
1658
1659 0 globus_module_deactivate(GLOBUS_COMMON_MODULE);
1660 0 error_out:
1661 0 return rc;
1662 }
1663 /* globus_l_notification_producer_activate() */
1664
1665 static
1666 int
1667 globus_l_notification_producer_deactivate(void)
1668 0 {
1669 0 globus_hashtable_destroy(&globus_i_notification_subscriptions);
1670 0 globus_hashtable_destroy(&globus_i_notification_producers);
1671 0 globus_mutex_destroy(&globus_i_notification_producer_lock);
1672 0 globus_module_deactivate(NOTIFICATIONCONSUMERSERVICE_MODULE);
1673 0 globus_module_activate(GLOBUS_WSRF_RESOURCE_MODULE);
1674
1675 0 GlobusDebugDestroy(GLOBUS_NOTIFICATION_PRODUCER);
1676
1677 0 globus_module_deactivate(GLOBUS_COMMON_MODULE);
1678
1679 0 return 0;
1680 }
1681 /* globus_l_notification_producer_deactivate() */
1682
1683 static
1684 void
1685 globus_l_notification_producer_atexit(void)
1686 271 {
1687 271 if (globus_l_notification_SubscriptionManager_handle != NULL)
1688 {
1689 23 globus_extension_release(
1690 globus_l_notification_SubscriptionManager_handle);
1691 23 globus_l_notification_SubscriptionManager_handle = NULL;
1692 23 globus_l_notification_SubscriptionManager_service_descriptor = NULL;
1693 23 globus_extension_deactivate(
1694 PAUSABLESUBSCRIPTIONMANAGERSERVICE_BASE_PATH);
1695 }
1696 271 }
1697 /* globus_l_notification_producer_atexit() */
1698
1699 static
1700 void
1701 globus_l_notification_activate_subscription_manager(void)
1702 23 {
1703 int rc;
1704 char * service_path;
1705
1706 23 service_path = GLOBUS_SERVICE_ENGINE_MODULE_PATH_PREFIX "/"
1707 PAUSABLESUBSCRIPTIONMANAGERSERVICE_BASE_PATH;
1708
1709 23 rc = globus_extension_activate(service_path);
1710
1711 23 if (rc != GLOBUS_SUCCESS)
1712 {
1713 0 goto out;
1714 }
1715
1716 23 globus_l_notification_SubscriptionManager_service_descriptor =
1717 globus_extension_lookup(
1718 &globus_l_notification_SubscriptionManager_handle,
1719 GLOBUS_SERVICE_REGISTRY,
1720 service_path);
1721 23 if (globus_l_notification_SubscriptionManager_service_descriptor == NULL)
1722 {
1723 0 globus_extension_deactivate(PAUSABLESUBSCRIPTIONMANAGERSERVICE_BASE_PATH);
1724 }
1725 23 out:
1726 return;
1727 }
1728 /* globus_l_notification_activate_subscription_manager() */
1729
1730 static
1731 int
1732 globus_l_notification_epr_hash(
1733 void * key,
1734 int limit)
1735 29 {
1736 globus_result_t result;
1737 char * reference_properties;
1738 29 int rc = 0;
1739
1740 29 if (key == NULL)
1741 {
1742 0 goto out;
1743 }
1744
1745 29 result = globus_l_notification_epr_get_reference_properties(
1746 key,
1747 &reference_properties);
1748 29 if (result != GLOBUS_SUCCESS)
1749 {
1750 0 goto out;
1751 }
1752
1753 29 rc = globus_hashtable_string_hash(reference_properties, limit);
1754
1755 29 free(reference_properties);
1756 29 out:
1757 29 return rc;
1758 }
1759 /* globus_l_notification_epr_hash() */
1760
1761 static
1762 int
1763 globus_l_notification_epr_keyequal(
1764 void * epr1,
1765 void * epr2)
1766 0 {
1767 globus_result_t result;
1768 char * reference_properties_1;
1769 char * reference_properties_2;
1770 0 int rc = 0;
1771
1772 0 if (epr1 == epr2)
1773 {
1774 0 rc = 1;
1775
1776 0 goto out;
1777 }
1778
1779 0 result = globus_l_notification_epr_get_reference_properties(
1780 epr1,
1781 &reference_properties_1);
1782 0 if (result != GLOBUS_SUCCESS)
1783 {
1784 0 goto out;
1785 }
1786
1787 0 result = globus_l_notification_epr_get_reference_properties(
1788 epr2,
1789 &reference_properties_2);
1790 0 if (result != GLOBUS_SUCCESS)
1791 {
1792 0 goto free_reference_properties_1;
1793 }
1794
1795 0 rc = (strcmp(reference_properties_1, reference_properties_2) == 0);
1796
1797 0 free(reference_properties_2);
1798 0 free_reference_properties_1:
1799 0 free(reference_properties_1);
1800 0 out:
1801 0 return rc;
1802 }
1803 /* globus_l_notification_epr_keyequal() */
1804
1805 static
1806 globus_result_t
1807 globus_l_notification_epr_get_reference_properties(
1808 const wsa_EndpointReferenceType * epr,
1809 char ** reference_properties)
1810 29 {
1811 globus_result_t result;
1812 29 xmlURIPtr uri = NULL;
1813 xmlChar * base_path;
1814 globus_extension_handle_t ext;
1815 globus_service_descriptor_t * service;
1816 char * tmp_path;
1817
1818 29 *reference_properties = NULL;
1819
1820 29 uri = xmlParseURI(epr->Address.base_value);
1821
1822 29 if (uri == NULL)
1823 {
1824 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1825
1826 0 goto out;
1827 }
1828
1829 29 base_path = (xmlChar*) uri->path;
1830
1831 87 while (*base_path == '/')
1832 {
1833 29 base_path++;
1834 }
1835 29 tmp_path = globus_common_create_string(
1836 GLOBUS_SERVICE_ENGINE_MODULE_PATH_PREFIX "/%s",
1837 base_path);
1838
1839 29 if (tmp_path == NULL)
1840 {
1841 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
1842
1843 0 goto free_uri_out;
1844 }
1845 29 service = globus_extension_lookup(
1846 &ext, GLOBUS_SERVICE_REGISTRY, tmp_path);
1847 29 if (service == NULL)
1848 {
1849 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1850
1851 0 goto free_tmp_path_out;
1852 }
1853
1854 29 result = service->get_resource_id(
1855 &epr->ReferenceParameters->any,
1856 reference_properties);
1857
1858 29 if (result != GLOBUS_SUCCESS)
1859 {
1860 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1861
1862 0 goto release_extension_out;
1863 }
1864
1865 29 release_extension_out:
1866 29 globus_extension_release(ext);
1867 29 free_tmp_path_out:
1868 29 free(tmp_path);
1869 29 free_uri_out:
1870 29 xmlFreeURI(uri);
1871 29 out:
1872 29 return result;
1873 }
1874 /* globus_l_notification_epr_get_reference_properties() */
1875
1876 /**
1877 * Looks up a subscription EPR.
1878 *
1879 * Finds the data structures associated with a particular EPR, and returns
1880 * locked references to the producer and the SubscriptionManager resource.
1881 * These can then be used to dispatch topic change notifications or to
1882 * destroy the resource.
1883 *
1884 * @param subscription_epr
1885 * EPR of a SubscriptionManager resource. This is the key to finding
1886 * the producer and the resource itself.
1887 * @param producer
1888 * Output parameter which will be updated to point to the producer
1889 * which created this subscription. The producer's lock will be acquired
1890 * if this function returns successfully.
1891 * @param subscription
1892 * Output parameter which will be updated to point to the
1893 * SubscriptionManager's WSRF resource. This resource reference must
1894 * be released by calling globus_resource_finish() or
1895 * globus_resource_destroy().
1896 *
1897 * @retval GLOBUS_SUCCESS
1898 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_NULL_PARAMETER
1899 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_UNKNOWN_RESOURCE
1900 */
1901 static
1902 globus_result_t
1903 globus_l_notification_producer_find_subscription(
1904 const wsa_EndpointReferenceType * subscription_epr,
1905 globus_i_notification_producer_t ** producer,
1906 globus_resource_t * subscription)
1907 0 {
1908 0 globus_result_t result = GLOBUS_SUCCESS;
1909 globus_i_notification_subscription_entry_t *
1910 subscription_entry;
1911
1912 0 if (subscription_epr == NULL || producer == NULL || subscription == NULL)
1913 {
1914 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
1915
1916 0 goto out;
1917 }
1918
1919 0 GlobusNotificationProducerGlobalLock();
1920
1921 0 subscription_entry = globus_hashtable_lookup(
1922 &globus_i_notification_subscriptions,
1923 (void *) subscription_epr);
1924
1925 0 if (subscription_entry == NULL)
1926 {
1927 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1928
1929 0 goto unlock_out;
1930 }
1931 0 *producer = subscription_entry->producer;
1932
1933 0 GlobusNotificationProducerLock(*producer);
1934
1935 0 result = globus_wsrf_core_get_resource_from_epr(
1936 (wsa_EndpointReferenceType *) subscription_epr,
1937 subscription);
1938
1939 0 if (result != GLOBUS_SUCCESS)
1940 {
1941 0 result = GLOBUS_NOTIFICATION_PRODUCER_UNKNOWN_RESOURCE();
1942
1943 0 goto unlock_producer_out;
1944 }
1945
1946 0 GlobusNotificationProducerGlobalUnlock();
1947
1948 0 return result;
1949
1950 0 unlock_producer_out:
1951 0 GlobusNotificationProducerUnlock(*producer);
1952
1953 /* Stale reference to producer---the subscription resource has been
1954 * destroyed
1955 */
1956 0 globus_hashtable_remove(
1957 &globus_i_notification_subscriptions,
1958 (void *) subscription_epr);
1959 0 wsa_EndpointReferenceType_destroy(subscription_entry->epr);
1960 0 free(subscription_entry);
1961 0 unlock_out:
1962 0 GlobusNotificationProducerGlobalUnlock();
1963
1964 0 *subscription = NULL;
1965 0 *producer = NULL;
1966 0 out:
1967 0 return result;
1968 }
1969 /* globus_l_notification_producer_find_subscription() */
1970
1971 /**
1972 * Create a new WSRF resource for a subscription.
1973 *
1974 * This function will generate a unique EPR for the new resource and
1975 * return it in the @a epr parameter. It will also modify the @a resource
1976 * parameter to point to a newly allocated (and locked) wsrf resource.
1977
1978 * The EPR's address is derived from the address of the producer which
1979 * is being used to create this subscription. The path of the EPR is
1980 * derived from the path in the service bindings. The reference properties
1981 * are generated using the Globus uuid generator.
1982 *
1983 * @param producer
1984 * Producer containing information about the
1985 * the topics and resources associated with this notification producing
1986 * service. The lock in this producer must be locked when this function
1987 * is called.
1988 * @param Subscribe
1989 * Parameters passed to the Subscribe operation of the NotificationProducer
1990 * implementation.
1991 * @param epr
1992 * Pointer to an output parameter which will be modified to contain
1993 * information for referencing the new resource. The caller is responsible
1994 * for freeing this.
1995 * @param resource
1996 * Pointer to an output parameter which will be modified to contain
1997 * the newly allocated wsrf resource. The caller is responsible for
1998 * finishing the resource when it is not needed and also for
1999 * ultimately destroying it.
2000 *
2001 * @retval GLOBUS_SUCCESS
2002 * The new EPR was successfully created.
2003 * @retval GLOBUS_NOTIFICATION_PRODUCER_ERROR_TYPE_OUT_OF_MEMORY
2004 * Insufficient resources avaiable to create a new EPR.
2005 */
2006 static
2007 globus_result_t
2008 globus_l_notification_subscription_create(
2009 const globus_notification_producer_t
2010 producer,
2011 const globus_service_engine_t engine,
2012 const wsnt_SubscribeType * Subscribe,
2013 wsa_EndpointReferenceType * epr,
2014 globus_resource_t * resource)
2015 29 {
2016 29 globus_result_t result = GLOBUS_SUCCESS;
2017 globus_uuid_t uuid;
2018 xsd_any * new_rp;
2019 int rc;
2020 char * contact;
2021
2022 29 if (resource == NULL)
2023 {
2024 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
2025
2026 0 goto out;
2027 }
2028
2029 29 *resource = NULL;
2030
2031 29 if (producer == NULL || engine == NULL || Subscribe == NULL || epr == NULL)
2032 {
2033 0 result = GLOBUS_NOTIFICATION_PRODUCER_NULL_PARAMETER();
2034
2035 0 goto out;
2036 }
2037
2038 29 result = globus_service_engine_get_contact(engine, &contact);
2039
2040 29 if (result != GLOBUS_SUCCESS)
2041 {
2042 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2043
2044 0 goto out;
2045 }
2046 /* Copy the EPR's address, replacing the path part of the EPR's address
2047 * with the path to the SubscriptionManager service
2048 */
2049 29 epr->Address.base_value = globus_libc_realloc(
2050 contact,
2051 strlen(contact) + strlen(PAUSABLESUBSCRIPTIONMANAGERSERVICE_BASE_PATH) + 1);
2052 29 if (epr->Address.base_value == NULL)
2053 {
2054 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2055
2056 0 goto free_contact_out;
2057 }
2058 29 contact = NULL;
2059 29 strcat(epr->Address.base_value, PAUSABLESUBSCRIPTIONMANAGERSERVICE_BASE_PATH);
2060
2061 /* Create reference properties for the resource we'll create */
2062 29 rc = globus_uuid_create(&uuid);
2063
2064 29 if (rc != GLOBUS_SUCCESS)
2065 {
2066 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2067
2068 0 goto destroy_epr_contents_out;
2069 }
2070 29 result = wsa_ReferenceParametersType_init(
2071 &epr->ReferenceParameters);
2072 29 if (result != GLOBUS_SUCCESS)
2073 {
2074 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2075
2076 0 goto destroy_epr_contents_out;
2077 }
2078
2079 29 new_rp = xsd_any_array_push(&epr->ReferenceParameters->any);
2080 29 if (new_rp == NULL)
2081 {
2082 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2083
2084 0 goto destroy_epr_contents_out;
2085 }
2086
2087 29 new_rp->any_info = &xsd_string_info;
2088
2089 29 xsd_QName_init(&new_rp->element);
2090 29 new_rp->element->Namespace = globus_libc_strdup(
2091 "http://www.globus.org/docs.oasis-open.org/wsn/b-2/provider");
2092 29 if (new_rp->element->Namespace == NULL)
2093 {
2094 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2095
2096 0 goto destroy_epr_contents_out;
2097 }
2098
2099 29 result = globus_resource_create(uuid.text, resource);
2100 29 if (result != GLOBUS_SUCCESS)
2101 {
2102 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2103
2104 0 goto destroy_epr_contents_out;
2105 }
2106
2107 29 new_rp->element->local = globus_libc_strdup("ResourceID");
2108 29 if (new_rp->element->local == NULL)
2109 {
2110 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2111
2112 0 goto destroy_epr_contents_out;
2113 }
2114 29 xsd_string_init((xsd_string **) &new_rp->value);
2115 29 if (new_rp->value == NULL)
2116 {
2117 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2118
2119 0 goto destroy_epr_contents_out;
2120 }
2121 29 *(xsd_string *) new_rp->value = globus_libc_strdup(uuid.text);
2122 29 if (*(xsd_string *) new_rp->value == NULL)
2123 {
2124 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2125
2126 0 goto destroy_epr_contents_out;
2127 }
2128
2129 /* We'll set the destroy time for the resource here. When we initialize
2130 * the providers' views of the resource, the ScheduledResourceTermination
2131 * provider will pick up this value and seed the appropriate RP
2132 */
2133 29 if (Subscribe->InitialTerminationTime != NULL)
2134 {
2135 time_t now;
2136 struct tm now_tm;
2137 globus_abstime_t initial_term_time;
2138
2139 0 switch (Subscribe->InitialTerminationTime->type)
2140 {
2141 case wsnt_AbsoluteOrRelativeTimeType_duration:
2142 0 now = time(NULL);
2143 0 if (gmtime_r(&now, &now_tm) == NULL)
2144 {
2145 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2146
2147 0 goto destroy_epr_contents_out;
2148 }
2149 0 result = globus_wsrf_core_add_duration(
2150 &now_tm,
2151 &Subscribe->InitialTerminationTime->value.duration);
2152 0 if (result != GLOBUS_SUCCESS)
2153 {
2154 0 goto destroy_epr_contents_out;
2155 }
2156 0 initial_term_time.tv_sec = globus_l_notification_timegm(&now_tm);
2157 0 initial_term_time.tv_nsec = 0;
2158 0 break;
2159 case wsnt_AbsoluteOrRelativeTimeType_dateTime:
2160 0 initial_term_time.tv_sec = globus_l_notification_timegm(
2161 &Subscribe->InitialTerminationTime->value.dateTime);
2162 0 initial_term_time.tv_nsec = 0L;
2163 0 break;
2164
2165 default:
2166 0 result = GLOBUS_NOTIFICATION_PRODUCER_SUBSCRIPTION_CREATION_FAILED();
2167
2168 0 goto destroy_epr_contents_out;
2169 }
2170
2171 0 result = globus_resource_set_destroy_time(
2172 *resource,
2173 &initial_term_time);
2174
2175 0 if (result != GLOBUS_SUCCESS)
2176 {
2177 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2178
2179 0 goto destroy_resource_out;
2180 }
2181 }
2182
2183 /* delayed from module activation to avoid recursive activation calls */
2184 29 globus_thread_once(
2185 &globus_l_subcription_manager_activate_once,
2186 globus_l_notification_activate_subscription_manager);
2187
2188 29 if (globus_l_notification_SubscriptionManager_service_descriptor
2189 == NULL)
2190 {
2191 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2192
2193 0 goto destroy_resource_out;
2194 }
2195 /* Initialize resource providers for the SubscriptionManager */
2196 29 result = globus_operation_table_init_resource(
2197 globus_l_notification_SubscriptionManager_service_descriptor->
2198 operations,
2199 epr);
2200
2201 29 if (result != GLOBUS_SUCCESS)
2202 {
2203 0 result = GLOBUS_NOTIFICATION_PRODUCER_OUT_OF_MEMORY();
2204
2205 0 goto destroy_resource_out;
2206 }
2207
2208 29 return result;
2209
2210 0 destroy_resource_out:
2211 0 globus_resource_destroy(*resource);
2212 0 *resource = NULL;
2213 0 destroy_epr_contents_out:
2214 0 wsa_EndpointReferenceType_destroy_contents(epr);
2215 0 memset(epr, 0, sizeof(wsa_EndpointReferenceType));
2216 0 free_contact_out:
2217 0 if (contact)
2218 {
2219 0 free(contact);
2220 }
2221 0 out:
2222 0 return result;
2223 }
2224 /* globus_l_notification_subscription_create() */
2225
2226 static
2227 void
2228 globus_l_notification_subscription_free(
2229 void * arg)
2230 6 {
2231 globus_i_notification_subscription_t *
2232 6 subscription = arg;
2233
2234 6 wsa_EndpointReferenceType_destroy(subscription->consumer_epr);
2235 6 globus_list_free(subscription->topics);
2236 6 free(subscription);
2237 6 }
2238 /* globus_l_notification_subscription_free() */
2239
2240 static
2241 void
2242 globus_l_notify_done(
2243 NotificationConsumerService_client_handle_t
2244 handle,
2245 void * callback_arg,
2246 globus_result_t result)
2247 80 {
2248 globus_i_notification_callback_info_t *
2249 80 callback_info = callback_arg;
2250 80 globus_mutex_lock(&callback_info->lock);
2251
2252 80 if (--callback_info->notification_count == 0)
2253 {
2254 77 globus_mutex_unlock(&callback_info->lock);
2255 77 globus_mutex_destroy(&callback_info->lock);
2256
2257 77 if (callback_info->callback)
2258 {
2259 0 callback_info->callback(
2260 callback_info->callback_arg,
2261 callback_info->topic_expression,
2262 callback_info->notification_message);
2263 }
2264
2265 77 wsnt_NotifyType_destroy(callback_info->Notify);
2266
2267 77 free(callback_info);
2268 }
2269 else
2270 {
2271 3 globus_mutex_unlock(&callback_info->lock);
2272 }
2273 80 NotificationConsumerService_client_destroy(handle);
2274 80 }
2275 /* globus_l_notify_done() */
2276
2277 static
2278 void
2279 globus_l_notification_callback_kickout(
2280 void * user_arg)
2281 0 {
2282 globus_i_notification_callback_info_t *
2283 0 callback_info = user_arg;
2284
2285 0 callback_info->callback(
2286 callback_info->callback_arg,
2287 callback_info->topic_expression,
2288 callback_info->notification_message);
2289
2290 0 globus_mutex_destroy(&callback_info->lock);
2291 0 wsnt_NotifyType_destroy(callback_info->Notify);
2292
2293 0 free(callback_info);
2294 0 }
2295 /* globus_l_notification_callback_kickout() */
2296
2297 /**
2298 * Convert a struct tm to a time_t without accounting for the local timezone
2299 * or modifying the struct tm contents.
2300 *
2301 * This may get a little kooky when the TZ shift occurs near the DST change.
2302 */
2303 static
2304 time_t
2305 globus_l_notification_timegm(
2306 const struct tm * tm)
2307 0 {
2308 struct tm tmp;
2309 struct tm local_tm;
2310 time_t local_time;
2311 time_t shifted_time;
2312
2313 0 memcpy(&tmp, tm, sizeof(struct tm));
2314
2315 /* convert the tm to time_t in the local timezone */
2316 0 local_time = mktime(&tmp);
2317
2318 /* create a new tm with this time (but without the TZ-related parts of the
2319 * tm set) */
2320 0 globus_libc_gmtime_r(&local_time, &local_tm);
2321
2322 /* convert the time to local time again, causing it to be shifted by
2323 * time zone twice
2324 */
2325 0 shifted_time = mktime(&local_tm);
2326
2327 /*
2328 * The difference between shifted_time and local_time is the TZ correction.
2329 */
2330 0 return local_time + (local_time - shifted_time);
2331 }
2332 /* wsrl_l_timegm() */
2333