UDF в MySQL, json или то, как забрать обновления данных из БД 3

Posted by Андрей on Октябрь 30, 2010

Иногда необходимо забирать данные из БД MySQL в режиме реального времени во внешнюю систему, которая никак не связана с MySQL. Существует множество возможных решений, например, можно реализовать «слейва» MySQL, который бы хранил полученные данные во внешней системе.

Одно из возможных решений – сделать «выгрузку» данных из MySQL с помощью UDF (User Defined Functions) и триггеров. Для этого необходимо поставить слейв MySQL, на котором уже повесить на интересующие таблицы триггеры, которые с помощью UDF будут выгружать поток изменений таблиц во внешнюю систему. Слейв необходим, т.к. если триггеры поставить на мастере, то в случае отката транзакции действия, уже сделанные триггерами, откатить не получится, а на слейв попадают только зафиксированные транзакции. Второе,чтобы триггеры работали на слейве, тип репликации должен быть выставлен на STATEMENT-based.

Порывшись в одном интересном архиве UDF для MySQL я нашел несколько функций, которые мне подошли:

  • преобразование строки MySQL в json;
  • интерфейс с memcached.

В результате получился следующий план действий: данные модифицируются на мастере, реплицируются на слейв с помощью STATEMENT-репликации. В процессе репликации на слейве запускаются триггеры, формируют с помощью UDF пакет обновлений в JSON, и передают его во внешнюю очередь (memcacheq) по memcached-протоколу. Конечно, это не единственный возможный способ, но все UDF уже были почти готовы. После доделывания напильником UDF получился вполне стабильно работающий вариант.

Триггеры выглядят примерно следующим образом:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE FUNCTION kick_photos (row_id INT) RETURNS INT 
BEGIN 
SELECT memc_set('queue_db', (json_object('insert' AS action, 'photos' AS table_name, photos.id AS id, json_members('data', json_object(photos.user_id AS `user_id`,photos.width AS `width`,photos.created_at AS `created_at`,photos.filename AS `filename`,photos.parent_id AS `parent_id`,photos.content_type AS `content_type`,photos.height AS `height`,photos.thumbnail AS `thumbnail`,photos.size AS `size`))))) INTO @dummy FROM photos WHERE id = row_id; 
RETURN @dummy; 
END
 
CREATE TRIGGER photos_INSERT AFTER INSERT ON photos FOR EACH ROW 
SET @dummy = memc_set('queue_db', (json_object('insert' AS action, 'photos' AS table_name, NEW.id AS id, json_members('data', json_object(NEW.user_id AS `user_id`,NEW.parent_id AS `parent_id`,NEW.created_at AS `created_at`,NEW.filename AS `filename`,NEW.width AS `width`,NEW.content_type AS `content_type`,NEW.height AS `height`,NEW.thumbnail AS `thumbnail`,NEW.size AS `size`)))));
 
CREATE TRIGGER photos_DELETE BEFORE DELETE ON photos FOR EACH ROW 
SET @dummy = memc_set('queue_db', (json_object('delete' AS action, 'photos' AS table_name, OLD.id AS id, json_members('data', json_object(OLD.user_id AS `user_id`,OLD.parent_id AS `parent_id`,OLD.created_at AS `created_at`,OLD.filename AS `filename`,OLD.width AS `width`,OLD.content_type AS `content_type`,OLD.height AS `height`,OLD.thumbnail AS `thumbnail`,OLD.size AS `size`)))));
 
CREATE TRIGGER photos_UPDATE AFTER UPDATE ON photos FOR EACH ROW 
BEGIN 
IF json_object(OLD.user_id AS `user_id`,OLD.parent_id AS `parent_id`,OLD.created_at AS `created_at`,OLD.filename AS `filename`,OLD.width AS `width`,OLD.content_type AS `content_type`,OLD.height AS `height`,OLD.thumbnail AS `thumbnail`,OLD.size AS `size`) <> json_object(NEW.user_id AS `user_id`,NEW.parent_id AS `parent_id`,NEW.created_at AS `created_at`,NEW.filename AS `filename`,NEW.width AS `width`,NEW.content_type AS `content_type`,NEW.height AS `height`,NEW.thumbnail AS `thumbnail`,NEW.size AS `size`) THEN 
  SET @dummy = memc_set('queue_db', (json_object('update' AS action, 'photos' AS table_name, OLD.id AS id, json_members('new', json_object(NEW.user_id AS `user_id`,NEW.parent_id AS `parent_id`,NEW.created_at AS `created_at`,NEW.filename AS `filename`,NEW.width AS `width`,NEW.content_type AS `content_type`,NEW.height AS `height`,NEW.thumbnail AS `thumbnail`,NEW.size AS `size`)), json_members('old', json_object(OLD.user_id AS `user_id`,OLD.parent_id AS `parent_id`,OLD.created_at AS `created_at`,OLD.filename AS `filename`,OLD.width AS `width`,OLD.content_type AS `content_type`,OLD.height AS `height`,OLD.thumbnail AS `thumbnail`,OLD.size AS `size`))))); 
END IF; 
END;

Комментарии:

  • функция kick_photos позволяет скопировать строчку таблицы в очередь как пакет обновления типа «вставка», может использоваться для начального наполнения внешней системы;
  • триггеры на удаление и вставку просто формируют соответствующие пакеты;
  • триггер на обновление проверяет, действительно ли в пакете произошли изменения (например, мы можем использовать не все поля в пакете);
  • необходимо учесть, что работе FOREIGN KEY CONSTRAINT триггеры не вызываются (очередной прикол MySQL), т.е., например, при если при выполнении запроса на удаление из таблицы A будут по FOREIGN KEY удалятся записи из таблицы B, то в триггере на удаление из A необходимо отработать этот случай, т.к. триггеры на таблице B не будут вызваны.

Код UDF доступен на github, это – «подпиленный» код из репозитория UDF или собственные разработки:

Trackbacks

Use this link to trackback from your own site.

Comments

Leave a response

  1. Станислав aka korchasa Вт, 02 Ноя 2010 04:26:01 UTC

    А как дела с надежностью? Особенно интересует memcached.

  2. Андрей Вт, 02 Ноя 2010 09:12:53 UTC

    Korchasa, там на самом деле запись идет в memcacheq.

    Так что надежность на уровне надежности Berkley DB.

    Я использую следующую схему: на сервере MySQL локально запускается memcacheq до MySQL, затем запускается MySQL, и начинается процесс репликации.

    Дефект состоит в том, что если memcacheq не будет работать, то триггеры не будут ждать его появления или прерывать процесс репликации. Но это можно доделать :)

    По моему опыту ни одной проблемы с memcacheq не было.

  3. Станислав aka korchasa Вт, 02 Ноя 2010 12:39:58 UTC

    Дефект состоит в том, что если memcacheq не будет работать, то триггеры не будут ждать его появления или прерывать процесс репликации. Но это можно доделать Ну собственно такие «подводности» и интересовали. Спасибо.

Comments